β

java.util.concurrent并发包诸类概览

四火的唠叨 262 阅读

java.util.concurrent并发包诸类概览

java.util.concurrent包的类都来自于JSR-166:Concurrent Utilities,官方的描述叫做“The JSR proposes a set of medium-level utilities that provide functionality commonly needed in concurrent programs. ”。作者是大名鼎鼎的Doug Lea,这个包的前身可以在这里找到,它最好的文档就是系统的API手册

当然,这里参考的concurrent包来自JDK7,比最初JDK1.5的版本有了不少改进。我曾经在《Java多线程发展简史》提到过,对于Java并发本身,在基础的并发模型建立以后,JSR-133和JSR-166是贡献最大的两个,如觉必要,在阅读这篇文章之前,你可以先移步阅读这篇文章,能帮助在脑子里建立起最基础的Java多线程知识模型;此外,还有一篇是《从DCL的对象安全发布谈起》,这篇文章相当于是对JSR-133规范的阅读理解。

这篇文章中,我只是简要地记录类的功能和使用,希望可以帮助大家全面掌握或回顾Java的并发包。当然,任何不清楚的接口和功能,JDK的API手册是最好的参考材料,如果想更进一步,参透至少大部分类的实现代码,这会非常非常辛苦。

并发容器

这些容器的关键方法大部分都实现了线程安全的功能,却不使用同步关键字(synchronized)。值得注意的是Queue接口本身定义的几个常用方法的区别,

  1. add方法和offer方法的区别在于超出容量限制时前者抛出异常,后者返回false;
  2. remove方法和poll方法都从队列中拿掉元素并返回,但是他们的区别在于空队列下操作前者抛出异常,而后者返回null;
  3. element方法和peek方法都返回队列顶端的元素,但是不把元素从队列中删掉,区别在于前者在空队列的时候抛出异常,后者返回null。

阻塞队列:

非阻塞队列:

转移队列:

其它容器:

同步设备

这些类大部分都是帮助做线程之间同步的,简单描述,就像是提供了一个篱笆,线程执行到这个篱笆的时候都得等一等,等到条件满足以后再往后走。

java.util.concurrent并发包诸类概览

给出一个Phaser使用的最简单的例子:

public class T {
    public static void main(String args[]) {
        final int count = 3;
        final Phaser phaser = new Phaser(count); // 总共有3个registered parties
        for(int i = 0; i < count; i++) {
            final Thread thread = new Thread(new Task(phaser));
            thread.start();
        }
    }
    
    public static class Task implements Runnable {
        private final Phaser phaser;

        public Task(Phaser phaser) {
            this.phaser = phaser;
        }
        
        @Override
        public void run() {
            phaser.arriveAndAwaitAdvance(); // 每执行到这里,都会有一个party arrive,如果arrived parties等于registered parties,就往下继续执行,否则等待
        }
    }
}

原子对象

这些对象都的行为在不使用同步的情况下保证了原子性。值得一提的有两点:

  1. weakCompareAndSet方法:compareAndSet方法很明确,但是这个是啥?根据JSR规范,调用weakCompareAndSet时并不能保证happen-before的一致性,因此允许存在重排序指令等等虚拟机优化导致这个操作失败(较弱的原子更新操作),但是从Java源代码看,它的实现其实和compareAndSet是一模一样的;
  2. lazySet方法:延时设置变量值,这个等价于set方法,但是由于字段是volatile类型的,因此次字段的修改会比普通字段(非volatile字段)有稍微的性能损耗,所以如果不需要立即读取设置的新值,那么此方法就很有用。

class BoundedBuffer {
  final Lock lock = new ReentrantLock();
  final Condition notFull  = lock.newCondition();
  final Condition notEmpty = lock.newCondition();

  final Object[] items = new Object[100];
  int putptr, takeptr, count;

  public void put(Object x) throws InterruptedException {
    lock.lock();
    try {
      while (count == items.length)
        notFull.await();
      items[putptr] = x;
      if (++putptr == items.length) putptr = 0;
      ++count;
      notEmpty.signal(); // 既然已经放进了元素,肯定不空了,唤醒“notEmpty”
    } finally {
      lock.unlock();
    }
  }

  public Object take() throws InterruptedException {
    lock.lock();
    try {
      while (count == 0)
        notEmpty.await();
      Object x = items[takeptr];
      if (++takeptr == items.length) takeptr = 0;
      --count;
      notFull.signal(); // 既然已经拿走了元素,肯定不满了,唤醒“notFull”
      return x;
    } finally {
      lock.unlock();
    }
  }
}

Fork-join框架

这是一个JDK7引入的并行框架,它把流程划分成fork(分解)+join(合并)两个步骤(怎么那么像MapReduce?),传统线程池来实现一个并行任务的时候,经常需要花费大量的时间去等待其他线程执行任务的完成,但是fork-join框架使用work stealing技术缓解了这个问题:

  1. 每个工作线程都有一个双端队列,当分给每个任务一个线程去执行的时候,这个任务会放到这个队列的头部;
  2. 当这个任务执行完毕,需要和另外一个任务的结果执行合并操作,可是那个任务却没有执行的时候,不会干等,而是把另一个任务放到队列的头部去,让它尽快执行;
  3. 当工作线程的队列为空,它会尝试从其他线程的队列尾部偷一个任务过来;
  4. 取得的任务可以被进一步分解。
class SortTask extends RecursiveAction {
    final long[] array;
    final int lo;
    final int hi;
    private int THRESHOLD = 30;

    public SortTask(long[] array) {
        this.array = array;
        this.lo = 0;
        this.hi = array.length - 1;
    }

    public SortTask(long[] array, int lo, int hi) {
        this.array = array;
        this.lo = lo;
        this.hi = hi;
    }

    @Override
    protected void compute() {
        if (hi - lo < THRESHOLD)
            sequentiallySort(array, lo, hi);
        else {
            int pivot = partition(array, lo, hi);
            coInvoke(new SortTask(array, lo, pivot - 1), new SortTask(array,
                pivot + 1, hi));
        }
    }

    private int partition(long[] array, int lo, int hi) {
        long x = array[hi];
        int i = lo - 1;
        for (int j = lo; j < hi; j++) {
            if (array[j] <= x) {
                i++;
                swap(array, i, j);
            }
        }
        swap(array, i + 1, hi);
        return i + 1;
    }

    private void swap(long[] array, int i, int j) {
        if (i != j) {
            long temp = array[i];
            array[i] = array[j];
            array[j] = temp;
        }
    }

    private void sequentiallySort(long[] array, int lo, int hi) {
        Arrays.sort(array, lo, hi + 1);
    }
}

测试的调用代码:

@Test
public void testSort() throws Exception {
    ForkJoinTask sort = new SortTask(array);
    ForkJoinPool fjpool = new ForkJoinPool();
    fjpool.submit(sort);
    fjpool.shutdown();

    fjpool.awaitTermination(30, TimeUnit.SECONDS);

    assertTrue(checkSorted(array));
}

RecursiveTask和RecursiveAction的区别在于它的compute是可以有返回值的,子任务的计算使用fork()方法,结果的获取使用join()方法:

class Fibonacci extends RecursiveTask {
    final int n;

    Fibonacci(int n) {
        this.n = n;
    }

    private int compute(int small) {
        final int[] results = { 1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89 };
        return results[small];
    }

    public Integer compute() {
        if (n <= 10) {
            return compute(n);
        }
        Fibonacci f1 = new Fibonacci(n - 1);
        Fibonacci f2 = new Fibonacci(n - 2);
        f1.fork();
        f2.fork();
        return f1.join() + f2.join();
    }
}

执行器和线程池

这个是我曾经举过的例子:

public class FutureUsage {
 
    public static void main(String[] args) {
        ExecutorService executor = Executors.newSingleThreadExecutor();
 
        Callable<Object> task = new Callable<Object>() {
            public Object call() throws Exception {
 
                Thread.sleep(4000);
 
                Object result = "finished";
                return result;
            }
        };
 
        Future<Object> future = executor.submit(task);
        System.out.println("task submitted");
 
        try {
            System.out.println(future.get());
        } catch (InterruptedException e) {
        } catch (ExecutionException e) {
        }
 
        // Thread won't be destroyed.
    }
}

线程池具备这样的优先级处理策略:

  1. 请求到来首先交给coreSize内的常驻线程执行
  2. 如果coreSize的线程全忙,任务被放到队列里面
  3. 如果队列放满了,会新增线程,直到达到maxSize
  4. 如果还是处理不过来,会把一个异常扔到RejectedExecutionHandler中去,用户可以自己设定这种情况下的最终处理策略

对于大于coreSize而小于maxSize的那些线程,空闲了keepAliveTime后,会被销毁。观察上面说的优先级顺序可以看到,假如说给ExecutorService一个无限长的队列,比如LinkedBlockingQueue,那么maxSize>coreSize就是没有意义的。

java.util.concurrent并发包诸类概览

ExecutorService:

ScheduledExecutor:

CompletionService:

其它:

文章未经特殊标明皆为本人原创,未经许可不得用于任何商业用途,转载请保持完整性并注明来源链接《四火的唠叨》

分享到:
你可能也喜欢:
作者:四火的唠叨
一个纯正程序员的啰嗦
原文地址:java.util.concurrent并发包诸类概览, 感谢原作者分享。

发表评论