什么是java线程池

Python032

什么是java线程池,第1张

线程是为了能够让计算机资源合理的分配,对于处理不同的任务创建不同的线程进行处理,但是计算机创建一个线程或者销毁一个线程所花费的也是比较昂贵的,有时候需要同时处理的事情比较多,就需要我们频繁的进行线程的创建和销毁,这样花费的时间也是比较多的。为了解决这一问题,我们就可以引用线程池的概念。

所谓线程池就是将线程集中管理起来,当需要线程的时候,可以从线程池中获取空闲的线程,这样可以减少线程的频繁创建与销毁,节省很大的时间和减少很多不必要的操作。

在java中提供了ThreadPoolExecutor类来进行线程的管理,这个类继承于AbstractExecutorService,而AbstractExecutorService实现了ExecutorService接口,我们可以使用ThreadPoolExecutor来进行线程池的创建。

在ThreadPoolExecutor的构造方法中,有多个参数,可以配置不同的参数来进行优化。这个类的源码构造方法为:

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)其中每个参数代表的意义分别为

corePoolSize : 线程池中的核心线程数量,当线程池中当前的线程数小于这个配置的时候,如果有一个新的任务到来,即使线程池中还存在空闲状态的线程,程序也会继续创建一个新的线程放进线程池当中

maximumPoolSize: 线程池中的线程最大数量

keepAliveTime:当线程池中的线程数量大于配置的核心线程数量(corePoolSize)的时候,如果当前有空闲的线程,则当这个空闲线程可以存在的时间,如果在keepAliveTime这个时间点内没有新的任务使用这个线程,那么这个线程将会结束,核心线程不会结束,但是如果配置了allowCoreThreadTimeOut = true,则当空闲时间超过keepAliveTime之后,线程也会被结束调,默认allowCoreThreadTimeOut = false,即表示默认情况下,核心线程会一直存在于线程池当中。

unit : 空闲线程保持连接时间(keepAliveTime)的时间单位

workQueue:阻塞的任务队列,用来保存等待需要执行的任务。

threadFactory :线程工厂,可以根据自己的需求去创建线程的对象,设置线程的名称,优先级等属性信息。

handler:当线程池中存在的线程数超过设置的最大值之后,新的任务就会被拒绝,可以自己定义一个拒绝的策略,当新任务被拒绝之后,就会使用hander方法进行处理。

在java中也提供了Executors工具类,在这个工具类中提供了多个创建线程池的静态方法,其中包含newCachedThreadPool、newFixedThreadPool、newScheduledThreadPool、newSingleThreadExecutor等。但是他们每个方法都是创建了ThreadPoolExecutor对象,不同的是,每个对象的初始 参数值不一样

线程池通俗的描述就是预先创建若干空闲线程 等到需要用多线程去处理事务的时候去唤醒某些空闲线程执行处理任务 这样就省去了频繁创建线程的时间 因为频 繁创建线程是要耗费大量的CPU资源的 如果一个应用程序需要频繁地处理大量并发事务 不断的创建销毁线程往往会大大地降低系统的效率 这时候线程池就派 上用场了

本文旨在使用Java语言编写一个通用的线程池 当需要使用线程池处理事务时 只需按照指定规范封装好事务处理对象 然后用已有的线程池对象去自动选择空 闲线程自动调用事务处理对象即可 并实现线程池的动态修改(修改当前线程数 最大线程数等) 下面是实现代码

//ThreadTask java

package polarman threadpool

/** *//**

*线程任务

* @author ryang

*

*/

public interface ThreadTask {

public void run()

}

//PooledThread java

package polarman threadpool

import java util Collectionimport java util Vector

/** *//**

*接受线程池管理的线程

* @author ryang

*

*/

public class PooledThread extends Thread {

protected Vector tasks = new Vector()

protected boolean running = false

protected boolean stopped = false

protected boolean paused = false

protected boolean killed = false

private ThreadPool pool

public PooledThread(ThreadPool pool) { this pool = pool

}

public void putTask(ThreadTask task) { tasks add(task)

}

public void putTasks(ThreadTask[] tasks) { for(int i= i<tasks lengthi++) this tasks add(tasks[i])

}

public void putTasks(Collection tasks) { this tasks addAll(tasks)

}

protected ThreadTask popTask() { if(tasks size() >) return (ThreadTask)tasks remove( )

else

return null

}

public boolean isRunning() {

return running

}

public void stopTasks() {

stopped = true

}

public void stopTasksSync() {

stopTasks()

while(isRunning()) { try {

sleep( )

} catch (InterruptedException e) {

}

}

}

public void pauseTasks() {

paused = true

}

public void pauseTasksSync() {

pauseTasks()

while(isRunning()) { try {

sleep( )

} catch (InterruptedException e) {

}

}

}

public void kill() { if(!running)

interrupt()

else

killed = true

}

public void killSync() {

kill()

while(isAlive()) { try {

sleep( )

} catch (InterruptedException e) {

}

}

}

public synchronized void startTasks() {

running = true

this notify()

}

public synchronized void run() { try { while(true) { if(!running || tasks size() == ) { pool notifyForIdleThread()//System out println(Thread currentThread() getId() + : 空闲 )this wait()}else {

ThreadTask task

while((task = popTask()) != null) { task run()if(stopped) {

stopped = false

if(tasks size() >) { tasks clear()System out println(Thread currentThread() getId() + : Tasks are stopped )

break

}

}

if(paused) {

paused = false

if(tasks size() >) { System out println(Thread currentThread() getId() + : Tasks are paused )

break

}

}

}

running = false

}

if(killed) {

killed = false

break

}

}

}catch(InterruptedException e) {

return

}

//System out println(Thread currentThread() getId() + : Killed )

}

}

//ThreadPool java

package polarman threadpool

import java util Collectionimport java util Iteratorimport java util Vector

/** *//**

*线程池

* @author ryang

*

*/

public class ThreadPool {

protected int maxPoolSize

protected int initPoolSize

protected Vector threads = new Vector()

protected boolean initialized = false

protected boolean hasIdleThread = false

public ThreadPool(int maxPoolSize int initPoolSize) { this maxPoolSize = maxPoolSizethis initPoolSize = initPoolSize

}

public void init() {

initialized = true

for(int i= i<initPoolSizei++) {

PooledThread thread = new PooledThread(this)

thread start()threads add(thread)

}

//System out println( 线程池初始化结束 线程数= + threads size() + 最大线程数= + maxPoolSize)

}

public void setMaxPoolSize(int maxPoolSize) { //System out println( 重设最大线程数 最大线程数= + maxPoolSize)this maxPoolSize = maxPoolSize

if(maxPoolSize <getPoolSize())

setPoolSize(maxPoolSize)

}

/** *//**

*重设当前线程数

* 若需杀掉某线程 线程不会立刻杀掉 而会等到线程中的事务处理完成* 但此方法会立刻从线程池中移除该线程 不会等待事务处理结束

* @param size

*/

public void setPoolSize(int size) { if(!initialized) {

initPoolSize = size

return

}else if(size >getPoolSize()) { for(int i=getPoolSize()i<size &&i<maxPoolSizei++) {

PooledThread thread = new PooledThread(this)

thread start()threads add(thread)

}

}else if(size <getPoolSize()) { while(getPoolSize() >size) { PooledThread th = (PooledThread)threads remove( )th kill()

}

}

//System out println( 重设线程数 线程数= + threads size())

}

public int getPoolSize() { return threads size()

}

protected void notifyForIdleThread() {

hasIdleThread = true

}

protected boolean waitForIdleThread() {

hasIdleThread = false

while(!hasIdleThread &&getPoolSize() >= maxPoolSize) { try { Thread sleep( )} catch (InterruptedException e) {

return false

}

}

return true

}

public synchronized PooledThread getIdleThread() { while(true) { for(Iterator itr=erator()itr hasNext()) { PooledThread th = (PooledThread)itr next()if(!th isRunning())

return th

}

if(getPoolSize() <maxPoolSize) {

PooledThread thread = new PooledThread(this)

thread start()threads add(thread)

return thread

}

//System out println( 线程池已满 等待 )

if(waitForIdleThread() == false)

return null

}

}

public void processTask(ThreadTask task) {

PooledThread th = getIdleThread()

if(th != null) { th putTask(task)th startTasks()

}

}

public void processTasksInSingleThread(ThreadTask[] tasks) {

PooledThread th = getIdleThread()

if(th != null) { th putTasks(tasks)th startTasks()

}

}

public void processTasksInSingleThread(Collection tasks) {

PooledThread th = getIdleThread()

if(th != null) { th putTasks(tasks)th startTasks()

}

}

}

下面是线程池的测试程序

//ThreadPoolTest java

import java io BufferedReaderimport java io IOExceptionimport java io InputStreamReader

import polarman threadpool ThreadPoolimport polarman threadpool ThreadTask

public class ThreadPoolTest {

public static void main(String[] args) { System out println( quit 退出 )System out println( task A 启动任务A 时长为 秒 )System out println( size 设置当前线程池大小为 )System out println( max 设置线程池最大线程数为 )System out println()

final ThreadPool pool = new ThreadPool( )pool init()

Thread cmdThread = new Thread() { public void run() {

BufferedReader reader = new BufferedReader(new InputStreamReader(System in))

while(true) { try { String line = reader readLine()String words[] = line split( )if(words[ ] equalsIgnoreCase( quit )) { System exit( )}else if(words[ ] equalsIgnoreCase( size ) &&words length >= ) { try { int size = Integer parseInt(words[ ])pool setPoolSize(size)}catch(Exception e) {

}

}else if(words[ ] equalsIgnoreCase( max ) &&words length >= ) { try { int max = Integer parseInt(words[ ])pool setMaxPoolSize(max)}catch(Exception e) {

}

}else if(words[ ] equalsIgnoreCase( task ) &&words length >= ) { try { int timelen = Integer parseInt(words[ ])SimpleTask task = new SimpleTask(words[ ] timelen * )pool processTask(task)}catch(Exception e) {

}

}

} catch (IOException e) { e printStackTrace()

}

}

}

}

cmdThread start()

/**//*

for(int i= i<i++){

SimpleTask task = new SimpleTask( Task + i (i+ )* )pool processTask(task)

}*/

}

}

class SimpleTask implements ThreadTask {

private String taskName

private int timeLen

public SimpleTask(String taskName int timeLen) { this taskName = taskNamethis timeLen = timeLen

}

public void run() { System out println(Thread currentThread() getId() +

: START TASK + taskName + )

try { Thread sleep(timeLen)} catch (InterruptedException e) {

}

System out println(Thread currentThread() getId() +

: END TASK + taskName + )

}

}

使用此线程池相当简单 下面两行代码初始化线程池

ThreadPool pool = new ThreadPool( )pool init()

要处理的任务实现ThreadTask 接口即可(如测试代码里的SimpleTask) 这个接口只有一个方法run()

两行代码即可调用

lishixinzhi/Article/program/Java/hx/201311/27203

( )根据xml文件来管理线程池的最大最小线程数( )对线程池通过Timer定期扫描以防止线程未激活 ( )通过某一个变量(本程序中是freeThreadCount)来得到空闲线程的数目 一 配置xml(listen xml)是 <?xml version= encoding= UTF ?><config><ConsumeThreadPool><minPools></minPools> <! 线程池最小线程 ><maxPools></maxPools><! 线程池最大线程 ><checkThreadPeriod></checkThreadPeriod> <! 检查线程池中线程的周期 分钟 ></ConsumeThreadPool></config> 二 对于ConsumeThreadPoolPara的javabean: import java io *public class ConsumeThreadPoolPara implements Serializable{private int minPoolsprivate int maxPoolsprivate int checkThreadPeriodpublic int getMinPools(){return minPools}public int getMaxPools(){return maxPools}public int getCheckThreadPeriod(){return checkThreadPeriod}public void setMinPools(int minPools){this minPools = minPools}public void setMaxPools(int maxPools){this maxPools = maxPools}public void setCheckThreadPeriod(int checkThreadPeriod){this checkThreadPeriod = checkThreadPeriod}public String toString(){return minPools+ + maxPools+ +checkThreadPeriod}public ConsumeThreadPoolPara() {}public static void main(String[] args) {ConsumeThreadPoolPara consumeThreadPool = new ConsumeThreadPoolPara()}} 三 解析xml程序代码(生成ConsumeThreadPoolPara) 使用jdom解析 import jdom *import jdom input SAXBuilderimport java io *import java util *public class ParseConfig {static Hashtable Listens = nullstatic ConnPara connpara = nullstatic ConsumeThreadPoolPara consumeThreadPoolPara = nullprivate static String configxml = listen xml static{getConsumeThreadPoolPara() //得到消费的线程池的参数}/*** 装载文档* @return 返回根结点* @throws JDOMException*/public static Element loadDocument() throws JDOMException{SAXBuilder parser = new SAXBuilder()// 新建立构造器try {Document document = parser build(configxml)Element root = document getRootElement()return root}catch(JDOMException e){logger error( listen xml文件格式非法! )throw new JDOMException()}}public static ConsumeThreadPoolPara getConsumeThreadPoolPara(){if(consumeThreadPoolPara ==null){try {Element root = loadDocument()Element consumeThreadPool = root getChild( ConsumeThreadPool )if (consumeThreadPool != null) { //代表有数据库配置consumeThreadPoolPara = new ConsumeThreadPoolPara()Element minPools = consumeThreadPool getChild( minPools )consumeThreadPoolPara setMinPools(Integer parseInt(minPools getTextTrim()))Element maxPools = consumeThreadPool getChild( maxPools )consumeThreadPoolPara setMaxPools(Integer parseInt(maxPools getTextTrim()))Element checkThreadPeriod = consumeThreadPool getChild( checkThreadPeriod )consumeThreadPoolPara setCheckThreadPeriod(Integer parseInt(checkThreadPeriod getTextTrim()))}}catch (JDOMException e) {}}return consumeThreadPoolPara}} 四 线程池源代码 import java util */*** <p>Title: 线程池</p>* <p>Description: 采集消费模块</p>* <p>Copyright: Copyright (c) </p>* <p>Company: </p>* @author 张荣斌* @version */public class ThreadPool {private static int minPools = //最小连接池数目private static int maxPools = //最大连接池数目private static int checkThreadPeriod = //检查连接池的周期ArrayList m_ThreadList //工作线程列表LinkedList m_RunList = null //工作任务列表int totalThread =  //总线程数static int freeThreadCount =  //未被使用的线程数目private java util Timer timer = null //定时器static Object o = new Object()static{ //先初始化线程池的参数ConsumeThreadPoolPara consumeThreadPoolPara = ParseConfig getConsumeThreadPoolPara()if(consumeThreadPoolPara!=null){minPools = consumeThreadPoolPara getMinPools()maxPools = consumeThreadPoolPara getMaxPools()checkThreadPeriod = consumeThreadPoolPara getCheckThreadPeriod()* * }}public void setMinPools(int minPools){this minPools = minPools}public void setMaxPools(int maxPools){this maxPools = maxPools}public void setCheckThreadPeriod(int checkThreadPeriod){this checkThreadPeriod = checkThreadPeriod}public ThreadPool() {m_ThreadList=new ArrayList()m_RunList=new LinkedList()for(int i= i<minPoolsi++){WorkerThread temp=new WorkerThread()totalThread = totalThread + m_ThreadList add(temp)temp start()try{Thread sleep( )}catch(Exception e){}}timer = new Timer(true) //启动定时器timer schedule(new CheckThreadTask(this) checkThreadPeriod)}/*** 当有一个工作来的时候启动线程池的线程* 当空闲线程数为 的时候 看总线程是否小于最大线程池的数目 就new一个新的线程 否则sleep 直到有空闲线程为止* 当空闲线程不为 则将任务丢给空闲线程去完成* @param work*/public synchronized void run(String work){if (freeThreadCount == ) {if(totalThread<maxPools){WorkerThread temp = new WorkerThread()totalThread = totalThread + m_ThreadList add(temp)temp start()synchronized(m_RunList){m_RunList add(work)m_RunList notify()}}else{while (freeThreadCount == ) {try {Thread sleep( )}catch (InterruptedException e) {}}synchronized(m_RunList){m_RunList add(work)m_RunList notify()}}} else {synchronized(m_RunList){m_RunList add(work)m_RunList notify()}}}/*** 检查所有的线程的有效性*/public synchronized void checkAllThreads() {Iterator lThreadIterator = erator()while (lThreadIterator hasNext()) { //逐个遍厉WorkerThread lTestThread = (WorkerThread) lThreadIterator next()if (! (lTestThread isAlive())) { //如果处在非活动状态时lTestThread = new WorkerThread()//重新生成个线程lTestThread start()//启动}}}/*** 打印调试信息*/public void printDebugInfo(){System out println( totalThread= +totalThread)System out println( m_ThreadList size()= +m_ThreadList size())}/**** <p>Title: 工作线程类</p>* @author 张荣斌* @version */class WorkerThread extends Thread{boolean running = trueString workpublic void run(){while(running){synchronized(o){freeThreadCount++}synchronized(m_RunList){while(m_RunList size() == ){try{m_RunList wait()if(!running) return}catch(InterruptedException e){}<lishixinzhi/Article/program/Java/gj/201311/27379