Java 线程池

张贤 2020年03月16日 149次浏览

在 Java 中,一般都会利用 Executors 创建不同的线程池满足不同场景的需求。

Executors.newFixedThreadPool(int nThreads)

指定工作线程数量的线程池,每来一个任务,就创建一个工作线程,如果线程数打到了nThreads,就把任务存放进队列中排队,

Executors.newCachedThreadPool()

处理大量短时间工作任务的线程

  • 试图缓存线程并重用,当没有缓存线程可用时,就会创建新的工作线程
  • 如果线程闲置的时间超过阈值(一般是 60 S),那么这个线程就会被终止并移出缓存
  • 这种线程池在系统长时间闲置的时候,不会消耗什么资源
Executors.newSingleThreadExecutor()

这种线程池只会创建一个工作线程来执行任务,如果这个线程异常结束,会新建一个线程来取代它。这个线程池最大的特定是可以保证顺序地执行任务,在任意时间只有一个线程

Executors.newSingleThreadScheduledExecutor()
Executors.ScheduledExecutorService newScheduledThreadPool(int corePoolSize)

定时或者周期性的工作调度,两者的区别在于单一线程还是多个线程

Executors.newWorkStealingPool()

这种线程池是 JDK8 开始引入的。内部会构建 ForkJoinPool,利用 work-stealing 算法,并行地处理任务,但是不保证处理顺序。
Fork Join 框架是 Java7 开始引入的并行执行任务的框架,类似于 Map Reduce。Fork Join 框架把一个大任务分割成若干个小任务,放到不同的队列中,并为每个队列创建一个单独的线程。那么就可能出现一种情况,有些队列的任务已经完成了,而有些队列的任务还没完成。所以为了提高效率,Fork Join 框架还使用了 work-stealing 算法,已经完成任务队列的空闲线程可以从其他队列里窃取任务来执行。在窃取任务时为了减少线程的竞争,所有的任务队列被设计为双端队列,队列原有的线程从队列一端拿任务执行,而窃取任务的线程从队列的另一端拿任务执行。

为什么要使用线程池:

  • 降低线程重复创建和销毁的消耗
  • 线程是有限的资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性。使用线程池可以提高线程的可管理性,进行统一的分配、调优和监控

Executor 框架
Executor 框架是一个根据一组执行策略调用,调度,执行和控制的异步任务的框架。其位于java.util.concurrent 包中。它提供了一种将”任务提交”与”任务运行”分离开来的机制。
包括如下三个接口:

  • Executor:运行新任务的简单接口,将任务提交和任务执行细节解耦
  • ExecutorService:具备管理执行器和任务生命周期的方法,提交任务的机制更加完善
  • ScheduledExecutorService:支持 Future 和定期执行任务


Executors 的 5 种创建线程最终都是创建了ThreadPoolExecutor对象

ThreadPoolExecutor 解析
ThreadPoolExecutor 的构造函数

  • corePoolSize:核心线程数,newFixedThreadPool 会将其设置为 nThreads ,而对于newCachedThreadPool则设为0。
  • maximumPoolSize:就是线程不够用时能够创建的最大线程数。newFixedThreadPool 设置为 nThreads,而 newCachedThreadPool则设置为 Integer.MAX_VALUE。
  • workQueue:任务等待队列,有很多可选的队列,不同的队列有不同的排队机制
  • keepAliveTime:线程池维护线程所允许的空闲时间。当线程池中的数量大于corePoolSize时,而多余的线程又处于空闲状态,多余的线程不会立即销毁,而是会等待keepAliveTime设置的时间,如果还没有任务提交运行,才会被销毁
  • ThreadFactory:作用是创建新线程,默认使用的是Executors.defaultThreadFactory(),创建的是优先级相同的非守护线程。
  • handler:如果队列满了,并且没有空闲线程,继续提交新任务的处理策略,JDK 提供了四种策略:
    • AbortPolicy:直接抛出异常,这时默认策略
    • CallerRunsPolicy:用调用者所在的线程来执行任务
    • DiscardOldestPolicy:丢弃队列中最靠前的任务,并执行当前任务
    • DiscardPolicy:直接丢弃任务
    • 也可以通过实现 RejectExecutionHandler 接口来自定义 handler

线程池会有一个队列负责接收用户提交的任务,然后将任务排队交给工作线程的集合,工作线程的集合负责管理工作线程的创建和销毁。在ThreadPoolExecutor内部有一个静态内部类Worker就是工作线程,并通过ThreadFactory来创建线程。

如果任务提交被拒绝(比如线程池已经处于shutdown 的状态),拒绝的任务需要有处理机制来处理,

新任务提交 execute 执行后的流程

  • 如果运行的线程少于corePoolSize,则创建新线程来处理任务,即使线程池中的其他线程是空闲的
  • 如果线程池中的线程数量大于corePoolSize且小于maximumPoolSize,则只有当workQueue满时才创建新的线程去处理任务
  • 如果设置的corePoolSizemaximumPoolSize,则创建的线程池的大小是固定的。如果这时有新任务提交,若workQueue未满,则将请求放入workQueue中,等待有空闲的线程去从workQueue中取任务并处理
  • 如果运行的线程数量大于等于maximumPoolSize,这时如果workQueue已经满了,则通过handler所指定的策略来处理任务

此处应该放一张线程池的执行流程图
把线程的状态值和有效线程数合二为一,存储在一个AtomicInteger变量ctl中,ctl的高 3 位是用来保存线程运行状态的,低 29 位使用来保存活动线程数量的。

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

因此可以通过位与操作来获取线程运行状态和活动线程数量。高效且优雅

private static int runStateOf(int c)     { return c & ~CAPACITY; }
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

Java 线程池的运行状态

  • RUNNING:能够接受新提交的任务,并且也能够处理阻塞队列中的任务
  • SHUTDOWN:不再接受新提交的任务,但可以处理阻塞队列中的存量任务(调用shutdown()方法会进入这个状态)
  • STOP:不再接受新提交的任务,也不处理阻塞队列中的存量任务(调用shutdownNow()方法会进入这个状态)
  • TIDYING:所有的任务都已终止,此时有效的线程数位 0
  • TERMINATED:terminated() 方法执行完成后进入该状态

下面是线程池的状态转换图


线程池的大小如何选定
线程池的线程数量过多过少都会产生问题,合理地选择线程池大小是非常重要的。但是在实际场景中并没有固定的选择线程池大小的规则,下面是根据实际经验总结出的一些选择:

  • CPU 密集型任务:线程太多,频繁切换会带来许多不不必要的开销,所以线程数=CPU核数(CPU核数+1)
  • I/O 密集型任务:线程数=CPU核数*(1+平均等待时间/平均工作时间)