所谓线程池就是将线程集中管理起来,当需要线程的时候,可以从线程池中获取空闲的线程,这样可以减少线程的频繁创建与销毁,节省很大的时间和减少很多不必要的操作。
在java中提供了ThreadPoolExecutor类来进行线程的管理,这个类继承于AbstractExecutorService,而AbstractExecutorService实现了ExecutorService接口,我们可以使用ThreadPoolExecutor来进行线程池的创建。
在ThreadPoolExecutor的构造方法中,有多个参数,可以配置不同的参数来进行优化。这个类的源码构造方法为:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)其中每个参数代表的意义分别为
corePoolSize : 线程池中的核心线程数量,当线程池中当前的线程数小于这个配置的时候,如果有一个新的任务到来,即使线程池中还存在空闲状态的线程,程序也会继续创建一个新的线程放进线程池当中
maximumPoolSize: 线程池中的线程最大数量
keepAliveTime:当线程池中的线程数量大于配置的核心线程数量(corePoolSize)的时候,如果当前有空闲的线程,则当这个空闲线程可以存在的时间,如果在keepAliveTime这个时间点内没有新的任务使用这个线程,那么这个线程将会结束,核心线程不会结束,但是如果配置了allowCoreThreadTimeOut = true,则当空闲时间超过keepAliveTime之后,线程也会被结束调,默认allowCoreThreadTimeOut = false,即表示默认情况下,核心线程会一直存在于线程池当中。
unit : 空闲线程保持连接时间(keepAliveTime)的时间单位
workQueue:阻塞的任务队列,用来保存等待需要执行的任务。
threadFactory :线程工厂,可以根据自己的需求去创建线程的对象,设置线程的名称,优先级等属性信息。
handler:当线程池中存在的线程数超过设置的最大值之后,新的任务就会被拒绝,可以自己定义一个拒绝的策略,当新任务被拒绝之后,就会使用hander方法进行处理。
在java中也提供了Executors工具类,在这个工具类中提供了多个创建线程池的静态方法,其中包含newCachedThreadPool、newFixedThreadPool、newScheduledThreadPool、newSingleThreadExecutor等。但是他们每个方法都是创建了ThreadPoolExecutor对象,不同的是,每个对象的初始 参数值不一样
线程池通俗的描述就是预先创建若干空闲线程 等到需要用多线程去处理事务的时候去唤醒某些空闲线程执行处理任务 这样就省去了频繁创建线程的时间 因为频 繁创建线程是要耗费大量的CPU资源的 如果一个应用程序需要频繁地处理大量并发事务 不断的创建销毁线程往往会大大地降低系统的效率 这时候线程池就派 上用场了
本文旨在使用Java语言编写一个通用的线程池 当需要使用线程池处理事务时 只需按照指定规范封装好事务处理对象 然后用已有的线程池对象去自动选择空 闲线程自动调用事务处理对象即可 并实现线程池的动态修改(修改当前线程数 最大线程数等) 下面是实现代码
//ThreadTask java
package polarman threadpool
/** *//**
*线程任务
* @author ryang
*
*/
public interface ThreadTask {
public void run()
}
//PooledThread java
package polarman threadpool
import java util Collectionimport java util Vector
/** *//**
*接受线程池管理的线程
* @author ryang
*
*/
public class PooledThread extends Thread {
protected Vector tasks = new Vector()
protected boolean running = false
protected boolean stopped = false
protected boolean paused = false
protected boolean killed = false
private ThreadPool pool
public PooledThread(ThreadPool pool) { this pool = pool
}
public void putTask(ThreadTask task) { tasks add(task)
}
public void putTasks(ThreadTask[] tasks) { for(int i= i<tasks lengthi++) this tasks add(tasks[i])
}
public void putTasks(Collection tasks) { this tasks addAll(tasks)
}
protected ThreadTask popTask() { if(tasks size() >) return (ThreadTask)tasks remove( )
else
return null
}
public boolean isRunning() {
return running
}
public void stopTasks() {
stopped = true
}
public void stopTasksSync() {
stopTasks()
while(isRunning()) { try {
sleep( )
} catch (InterruptedException e) {
}
}
}
public void pauseTasks() {
paused = true
}
public void pauseTasksSync() {
pauseTasks()
while(isRunning()) { try {
sleep( )
} catch (InterruptedException e) {
}
}
}
public void kill() { if(!running)
interrupt()
else
killed = true
}
public void killSync() {
kill()
while(isAlive()) { try {
sleep( )
} catch (InterruptedException e) {
}
}
}
public synchronized void startTasks() {
running = true
this notify()
}
public synchronized void run() { try { while(true) { if(!running || tasks size() == ) { pool notifyForIdleThread()//System out println(Thread currentThread() getId() + : 空闲 )this wait()}else {
ThreadTask task
while((task = popTask()) != null) { task run()if(stopped) {
stopped = false
if(tasks size() >) { tasks clear()System out println(Thread currentThread() getId() + : Tasks are stopped )
break
}
}
if(paused) {
paused = false
if(tasks size() >) { System out println(Thread currentThread() getId() + : Tasks are paused )
break
}
}
}
running = false
}
if(killed) {
killed = false
break
}
}
}catch(InterruptedException e) {
return
}
//System out println(Thread currentThread() getId() + : Killed )
}
}
//ThreadPool java
package polarman threadpool
import java util Collectionimport java util Iteratorimport java util Vector
/** *//**
*线程池
* @author ryang
*
*/
public class ThreadPool {
protected int maxPoolSize
protected int initPoolSize
protected Vector threads = new Vector()
protected boolean initialized = false
protected boolean hasIdleThread = false
public ThreadPool(int maxPoolSize int initPoolSize) { this maxPoolSize = maxPoolSizethis initPoolSize = initPoolSize
}
public void init() {
initialized = true
for(int i= i<initPoolSizei++) {
PooledThread thread = new PooledThread(this)
thread start()threads add(thread)
}
//System out println( 线程池初始化结束 线程数= + threads size() + 最大线程数= + maxPoolSize)
}
public void setMaxPoolSize(int maxPoolSize) { //System out println( 重设最大线程数 最大线程数= + maxPoolSize)this maxPoolSize = maxPoolSize
if(maxPoolSize <getPoolSize())
setPoolSize(maxPoolSize)
}
/** *//**
*重设当前线程数
* 若需杀掉某线程 线程不会立刻杀掉 而会等到线程中的事务处理完成* 但此方法会立刻从线程池中移除该线程 不会等待事务处理结束
* @param size
*/
public void setPoolSize(int size) { if(!initialized) {
initPoolSize = size
return
}else if(size >getPoolSize()) { for(int i=getPoolSize()i<size &&i<maxPoolSizei++) {
PooledThread thread = new PooledThread(this)
thread start()threads add(thread)
}
}else if(size <getPoolSize()) { while(getPoolSize() >size) { PooledThread th = (PooledThread)threads remove( )th kill()
}
}
//System out println( 重设线程数 线程数= + threads size())
}
public int getPoolSize() { return threads size()
}
protected void notifyForIdleThread() {
hasIdleThread = true
}
protected boolean waitForIdleThread() {
hasIdleThread = false
while(!hasIdleThread &&getPoolSize() >= maxPoolSize) { try { Thread sleep( )} catch (InterruptedException e) {
return false
}
}
return true
}
public synchronized PooledThread getIdleThread() { while(true) { for(Iterator itr=erator()itr hasNext()) { PooledThread th = (PooledThread)itr next()if(!th isRunning())
return th
}
if(getPoolSize() <maxPoolSize) {
PooledThread thread = new PooledThread(this)
thread start()threads add(thread)
return thread
}
//System out println( 线程池已满 等待 )
if(waitForIdleThread() == false)
return null
}
}
public void processTask(ThreadTask task) {
PooledThread th = getIdleThread()
if(th != null) { th putTask(task)th startTasks()
}
}
public void processTasksInSingleThread(ThreadTask[] tasks) {
PooledThread th = getIdleThread()
if(th != null) { th putTasks(tasks)th startTasks()
}
}
public void processTasksInSingleThread(Collection tasks) {
PooledThread th = getIdleThread()
if(th != null) { th putTasks(tasks)th startTasks()
}
}
}
下面是线程池的测试程序
//ThreadPoolTest java
import java io BufferedReaderimport java io IOExceptionimport java io InputStreamReader
import polarman threadpool ThreadPoolimport polarman threadpool ThreadTask
public class ThreadPoolTest {
public static void main(String[] args) { System out println( quit 退出 )System out println( task A 启动任务A 时长为 秒 )System out println( size 设置当前线程池大小为 )System out println( max 设置线程池最大线程数为 )System out println()
final ThreadPool pool = new ThreadPool( )pool init()
Thread cmdThread = new Thread() { public void run() {
BufferedReader reader = new BufferedReader(new InputStreamReader(System in))
while(true) { try { String line = reader readLine()String words[] = line split( )if(words[ ] equalsIgnoreCase( quit )) { System exit( )}else if(words[ ] equalsIgnoreCase( size ) &&words length >= ) { try { int size = Integer parseInt(words[ ])pool setPoolSize(size)}catch(Exception e) {
}
}else if(words[ ] equalsIgnoreCase( max ) &&words length >= ) { try { int max = Integer parseInt(words[ ])pool setMaxPoolSize(max)}catch(Exception e) {
}
}else if(words[ ] equalsIgnoreCase( task ) &&words length >= ) { try { int timelen = Integer parseInt(words[ ])SimpleTask task = new SimpleTask(words[ ] timelen * )pool processTask(task)}catch(Exception e) {
}
}
} catch (IOException e) { e printStackTrace()
}
}
}
}
cmdThread start()
/**//*
for(int i= i<i++){
SimpleTask task = new SimpleTask( Task + i (i+ )* )pool processTask(task)
}*/
}
}
class SimpleTask implements ThreadTask {
private String taskName
private int timeLen
public SimpleTask(String taskName int timeLen) { this taskName = taskNamethis timeLen = timeLen
}
public void run() { System out println(Thread currentThread() getId() +
: START TASK + taskName + )
try { Thread sleep(timeLen)} catch (InterruptedException e) {
}
System out println(Thread currentThread() getId() +
: END TASK + taskName + )
}
}
使用此线程池相当简单 下面两行代码初始化线程池
ThreadPool pool = new ThreadPool( )pool init()
要处理的任务实现ThreadTask 接口即可(如测试代码里的SimpleTask) 这个接口只有一个方法run()
两行代码即可调用
lishixinzhi/Article/program/Java/hx/201311/27203