J.U.C(java.util.concurrent)包的梳理

张贤 2020年03月17日 162次浏览

java.util.concurrent提供了并发编程的解决方案,主要包括两大块:

  • CAS:是java.util.concurrent.atomic包的基础
  • AQS:是java.util.concurrent.locks包以及一些常用类(比如 Semophore、ReentrantLock)的基础

J.U.C 包的分类:

  • 线程执行器:executor
  • 锁:locks
  • 原子变量类:atomic
  • 并发工具类:tools
  • 并发集合:collections

并发工具类

  • CountDownLatch:让主线程等待一组事件发生后继续执行。主线程设置一个 CountDownLatch,然后主线程调用 CountDownLatch 对象的 await() 方法等待计数器,子线程调用 CountDownLatch 对象的 countDown() 方法将计数器减1。直到变为 0,主线程才继续执行
public class CountDownLatchDemo {
    public static void main(String[] args) throws InterruptedException {
        CountDownLatchDemo demo = new CountDownLatchDemo();
        demo.go();
    }

    private void go() throws InterruptedException {
        //设置一个 CountDownLatch,计数器的值为 3
        CountDownLatch latch = new CountDownLatch(3);
        //依次创建 3 个线程,并启动
        new Thread(new Task(latch), "Thread1").start();
        Thread.sleep(1000);
        new Thread(new Task(latch), "Thread2").start();
        Thread.sleep(1000);
        new Thread(new Task(latch), "Thread3").start();
        latch.await();//等待子线程调用 countDown() 方法,直到计数器为0
        System.out.println("所有线程已到达,主线程开始执行:" + Thread.currentThread().getName());
    }

    class Task implements Runnable {
        private CountDownLatch latch;

        public Task(CountDownLatch latch) {
            this.latch = latch;
        }

        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName());
            //每调用一次 countDown(),CountDownLatch 中计数器的值就会减1
            latch.countDown();
        }
    }
}
  • CyclicBarrier:阻塞自己当前线程,等待其他线程,所有线程必须同时到达后,当前线程才能继续执行。所有线程到达后,可以触发执行另一个预先设置的线程。和 CountDownLatch 的区别是:在计数器不为 0 时,CountDownLatch 不会阻塞子线程;而 CyclicBarrier 会阻塞所有线程,直到计数器为 0,所有线程才继续执行
public class CyclicBarrierDemo {
    public static void main(String[] args) throws InterruptedException {
        CyclicBarrierDemo demo = new CyclicBarrierDemo();
        demo.go();
    }

    private void go() throws InterruptedException {
        //设置一个 CyclicBarrier,计数器的值为 3
        CyclicBarrier barrier = new CyclicBarrier(3);
        //依次创建 3 个线程,并启动
        new Thread(new Task(barrier), "Thread1").start();
        Thread.sleep(1000);
        new Thread(new Task(barrier), "Thread2").start();
        Thread.sleep(1000);
        new Thread(new Task(barrier), "Thread3").start();
    }

    class Task implements Runnable {
        private CyclicBarrier barrier;

        public Task(CyclicBarrier barrier) {
            this.barrier = barrier;
        }

        @Override
        public void run() {
            System.out.println("线程: " + Thread.currentThread().getName() + "已经到达");

            try {
                //计数器减1,并阻塞,直到计数器为 0,才继续往下执行
                barrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
            System.out.println("线程: " + Thread.currentThread().getName() + "开始处理");
        }
    }
}
  • Semaphore:控制某个资源可同时被访问的线程数量。通过 acquire() 方法访问资源,如果信号量满了就阻塞,直到可访问。使用 release() 释放资源
public class SemaphoreDemo {
    public static void main(String[] args) {
        ExecutorService service = Executors.newCachedThreadPool();
        //设置信号量,最多只能 5 个线程同时访问
        Semaphore semaphore = new Semaphore(5);
        //模拟 20 个客户端同时访问
        for (int i = 0; i < 20; i++) {
            final int NUMBER = i;
            Runnable runnable = new Runnable() {
                @Override
                public void run() {
                    try {
                        semaphore.acquire();
                        System.out.println("Accessing:" + NUMBER);
                        Thread.sleep((long) (Math.random() * 1000));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        semaphore.release();
                    }
                }
            };
            service.execute(runnable);
        }
        //关闭线程池
        service.shutdown();
    }
}
  • Exchanger:两个线程到达同步点后,相互交换数据
public class ExchangerDemo {
    static Exchanger<String> exchanger = new Exchanger();

    public static void main(String[] args) {
        //代表男生和女生
        ExecutorService service = Executors.newFixedThreadPool(2);
        service.execute(() -> {
            //女生对男生说的话
            try {
                String girl = exchanger.exchange("我其实暗恋你很久了....");
                System.out.println("女生说:" + girl);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        service.execute(() -> {

            try {
                System.out.println("女生慢慢地从教室里走出来...");
                TimeUnit.SECONDS.sleep(2);
                //男生对女生说的话
                String boy = exchanger.exchange("我很喜欢你");
                System.out.println("男生说:" + boy);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
    }
}

并发集合:collections

  • BlockingQueue:提供可阻塞的入队和出队操作。主要用于生产者和消费者模式,在多线程场景下生产者在队尾添加元素,而消费者线程则在队头消费元素,通过这种方式能够隔离任务的生产和消费。有 7 个实现类
    • ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列。先进先出,初始化容量后就不可改变
    • LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列或者无界阻塞队列。先进先出,如果初始化时不指定大小,则默认大小为 Integer.MAX_VALUE,可以认为是无界的。
    • PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列
    • DelayQueue:一个支持延迟获取元素的无界阻塞队列
    • SynchronousQueue:仅允许容纳一个元素。当一个线程添加元素后会被阻塞,直到这个元素被另一个线程消费掉
    • LinkedTransferQueue:一个由链表组成的无界阻塞队列。相当于 SynchronousQueue + LinkedBlockingQueue。性能比 LinkedBlockingQueue 高,比 SynchronousQueue 能存储更多的元素
    • LinkedBlockingDQueue:一个由链表组成的双向阻塞队列。在 Fork Join 中用到,每个工作线程都有自己的任务队列,比生产者消费者模型有更好的伸缩性。如果一个工作线程的任务队列消费完了,可以窃取其他任务队列尾部的任务,进一步减少竞争