Java实现通用线程池

Python013

Java实现通用线程池,第1张

线程池通俗的描述就是预先创建若干空闲线程 等到需要用多线程去处理事务的时候去唤醒某些空闲线程执行处理任务 这样就省去了频繁创建线程的时间 因为频 繁创建线程是要耗费大量的CPU资源的 如果一个应用程序需要频繁地处理大量并发事务 不断的创建销毁线程往往会大大地降低系统的效率 这时候线程池就派 上用场了

本文旨在使用Java语言编写一个通用的线程池 当需要使用线程池处理事务时 只需按照指定规范封装好事务处理对象 然后用已有的线程池对象去自动选择空 闲线程自动调用事务处理对象即可 并实现线程池的动态修改(修改当前线程数 最大线程数等) 下面是实现代码

//ThreadTask java

package polarman threadpool

/** *//**

*线程任务

* @author ryang

*

*/

public interface ThreadTask {

public void run()

}

//PooledThread java

package polarman threadpool

import java util Collectionimport java util Vector

/** *//**

*接受线程池管理的线程

* @author ryang

*

*/

public class PooledThread extends Thread {

protected Vector tasks = new Vector()

protected boolean running = false

protected boolean stopped = false

protected boolean paused = false

protected boolean killed = false

private ThreadPool pool

public PooledThread(ThreadPool pool) { this pool = pool

}

public void putTask(ThreadTask task) { tasks add(task)

}

public void putTasks(ThreadTask[] tasks) { for(int i= i<tasks lengthi++) this tasks add(tasks[i])

}

public void putTasks(Collection tasks) { this tasks addAll(tasks)

}

protected ThreadTask popTask() { if(tasks size() >) return (ThreadTask)tasks remove( )

else

return null

}

public boolean isRunning() {

return running

}

public void stopTasks() {

stopped = true

}

public void stopTasksSync() {

stopTasks()

while(isRunning()) { try {

sleep( )

} catch (InterruptedException e) {

}

}

}

public void pauseTasks() {

paused = true

}

public void pauseTasksSync() {

pauseTasks()

while(isRunning()) { try {

sleep( )

} catch (InterruptedException e) {

}

}

}

public void kill() { if(!running)

interrupt()

else

killed = true

}

public void killSync() {

kill()

while(isAlive()) { try {

sleep( )

} catch (InterruptedException e) {

}

}

}

public synchronized void startTasks() {

running = true

this notify()

}

public synchronized void run() { try { while(true) { if(!running || tasks size() == ) { pool notifyForIdleThread()//System out println(Thread currentThread() getId() + : 空闲 )this wait()}else {

ThreadTask task

while((task = popTask()) != null) { task run()if(stopped) {

stopped = false

if(tasks size() >) { tasks clear()System out println(Thread currentThread() getId() + : Tasks are stopped )

break

}

}

if(paused) {

paused = false

if(tasks size() >) { System out println(Thread currentThread() getId() + : Tasks are paused )

break

}

}

}

running = false

}

if(killed) {

killed = false

break

}

}

}catch(InterruptedException e) {

return

}

//System out println(Thread currentThread() getId() + : Killed )

}

}

//ThreadPool java

package polarman threadpool

import java util Collectionimport java util Iteratorimport java util Vector

/** *//**

*线程池

* @author ryang

*

*/

public class ThreadPool {

protected int maxPoolSize

protected int initPoolSize

protected Vector threads = new Vector()

protected boolean initialized = false

protected boolean hasIdleThread = false

public ThreadPool(int maxPoolSize int initPoolSize) { this maxPoolSize = maxPoolSizethis initPoolSize = initPoolSize

}

public void init() {

initialized = true

for(int i= i<initPoolSizei++) {

PooledThread thread = new PooledThread(this)

thread start()threads add(thread)

}

//System out println( 线程池初始化结束 线程数= + threads size() + 最大线程数= + maxPoolSize)

}

public void setMaxPoolSize(int maxPoolSize) { //System out println( 重设最大线程数 最大线程数= + maxPoolSize)this maxPoolSize = maxPoolSize

if(maxPoolSize <getPoolSize())

setPoolSize(maxPoolSize)

}

/** *//**

*重设当前线程数

* 若需杀掉某线程 线程不会立刻杀掉 而会等到线程中的事务处理完成* 但此方法会立刻从线程池中移除该线程 不会等待事务处理结束

* @param size

*/

public void setPoolSize(int size) { if(!initialized) {

initPoolSize = size

return

}else if(size >getPoolSize()) { for(int i=getPoolSize()i<size &&i<maxPoolSizei++) {

PooledThread thread = new PooledThread(this)

thread start()threads add(thread)

}

}else if(size <getPoolSize()) { while(getPoolSize() >size) { PooledThread th = (PooledThread)threads remove( )th kill()

}

}

//System out println( 重设线程数 线程数= + threads size())

}

public int getPoolSize() { return threads size()

}

protected void notifyForIdleThread() {

hasIdleThread = true

}

protected boolean waitForIdleThread() {

hasIdleThread = false

while(!hasIdleThread &&getPoolSize() >= maxPoolSize) { try { Thread sleep( )} catch (InterruptedException e) {

return false

}

}

return true

}

public synchronized PooledThread getIdleThread() { while(true) { for(Iterator itr=erator()itr hasNext()) { PooledThread th = (PooledThread)itr next()if(!th isRunning())

return th

}

if(getPoolSize() <maxPoolSize) {

PooledThread thread = new PooledThread(this)

thread start()threads add(thread)

return thread

}

//System out println( 线程池已满 等待 )

if(waitForIdleThread() == false)

return null

}

}

public void processTask(ThreadTask task) {

PooledThread th = getIdleThread()

if(th != null) { th putTask(task)th startTasks()

}

}

public void processTasksInSingleThread(ThreadTask[] tasks) {

PooledThread th = getIdleThread()

if(th != null) { th putTasks(tasks)th startTasks()

}

}

public void processTasksInSingleThread(Collection tasks) {

PooledThread th = getIdleThread()

if(th != null) { th putTasks(tasks)th startTasks()

}

}

}

下面是线程池的测试程序

//ThreadPoolTest java

import java io BufferedReaderimport java io IOExceptionimport java io InputStreamReader

import polarman threadpool ThreadPoolimport polarman threadpool ThreadTask

public class ThreadPoolTest {

public static void main(String[] args) { System out println( quit 退出 )System out println( task A 启动任务A 时长为 秒 )System out println( size 设置当前线程池大小为 )System out println( max 设置线程池最大线程数为 )System out println()

final ThreadPool pool = new ThreadPool( )pool init()

Thread cmdThread = new Thread() { public void run() {

BufferedReader reader = new BufferedReader(new InputStreamReader(System in))

while(true) { try { String line = reader readLine()String words[] = line split( )if(words[ ] equalsIgnoreCase( quit )) { System exit( )}else if(words[ ] equalsIgnoreCase( size ) &&words length >= ) { try { int size = Integer parseInt(words[ ])pool setPoolSize(size)}catch(Exception e) {

}

}else if(words[ ] equalsIgnoreCase( max ) &&words length >= ) { try { int max = Integer parseInt(words[ ])pool setMaxPoolSize(max)}catch(Exception e) {

}

}else if(words[ ] equalsIgnoreCase( task ) &&words length >= ) { try { int timelen = Integer parseInt(words[ ])SimpleTask task = new SimpleTask(words[ ] timelen * )pool processTask(task)}catch(Exception e) {

}

}

} catch (IOException e) { e printStackTrace()

}

}

}

}

cmdThread start()

/**//*

for(int i= i<i++){

SimpleTask task = new SimpleTask( Task + i (i+ )* )pool processTask(task)

}*/

}

}

class SimpleTask implements ThreadTask {

private String taskName

private int timeLen

public SimpleTask(String taskName int timeLen) { this taskName = taskNamethis timeLen = timeLen

}

public void run() { System out println(Thread currentThread() getId() +

: START TASK + taskName + )

try { Thread sleep(timeLen)} catch (InterruptedException e) {

}

System out println(Thread currentThread() getId() +

: END TASK + taskName + )

}

}

使用此线程池相当简单 下面两行代码初始化线程池

ThreadPool pool = new ThreadPool( )pool init()

要处理的任务实现ThreadTask 接口即可(如测试代码里的SimpleTask) 这个接口只有一个方法run()

两行代码即可调用

lishixinzhi/Article/program/Java/hx/201311/27203

new ThreadPoolExecutor(corePoolSize, maxinumPoolSize, keepAliveTime, milliseconds, runnableTaskQueue, handler)

参数简介:

当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其它空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池基本大小时就不会再创建。如果调用了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有基本线程。

用于保存等待执行的任务的阻塞队列,可以有以下选择:

ArrayBlockingQueue :是一个基于数组结构的有界阻塞队列,此队列按FIFO(先进先出)原则对元素进行排序

LinkedBlockingQueue :一个基于链表结构的阻塞队列,此队列按FIFO排序元素,吞吐量通常高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。

SynchronousQueue :一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态。吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool()使用了这个队列。

PriorityBlockingQueue :一个具有优先级的无限阻塞队列

线程池被允许创建的最大线程数,如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。需要注意的是,如果使用了无界的阻塞队列,那么这个参数也就没什么效果。

当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。默认情况下是AbortPolicy,表示无法处理新任务时抛出异常。在JDK1.5中Java线程池框架提供了以下4中策略。

AbortPolicy :直接抛出异常

CallerRunsPolicy :只用调用者所在线程来运行任务

DiscardOldestPolicy :丢弃队列里最近的一个任务,并执行当前任务

DiscardPolicy :不处理,丢弃掉

线程池的工作线程空闲后,保持存活的时间。所以如果任务很多,每个任务执行的时间比较短,可以调大时间,提高线程的利用率

可选单位有天(DAYS)、小时(HOURS)、分钟(MINUTES)、毫秒(MILLISECONDS)、微秒(MICROSECONDS,千分之一毫秒)、纳秒(NANOSECONDS,千分之一微秒)

线程池是一种多线程处理形式,处理过程中将任务添加队列,然后在创建线程后自动启动这些任务,每个线程都使用默认的堆栈大小,以默认的优先级运行,并处在多线程单元中,如果某个线程在托管代码中空闲,则线程池将插入另一个辅助线程来使所有处理器保持繁忙。如果所有线程池都始终保持繁忙,但队列中包含挂起的工作,则线程池将在一段时间后辅助线程的数目永远不会超过最大值。超过最大值的线程可以排队,但他们要等到其他线程完成后才能启动。

java里面的线程池的顶级接口是Executor,Executor并不是一个线程池,而只是一个执行线程的工具,而真正的线程池是ExecutorService。

java中的有哪些线程池?

1.newCachedThreadPool创建一个可缓存线程池程

2.newFixedThreadPool 创建一个定长线程池

3.newScheduledThreadPool 创建一个定长线程池

4.newSingleThreadExecutor 创建一个单线程化的线程池

————————————————