之所以存在线程池是基于以下两个原因:

  1. 线程的创建和销毁是需要有资源消耗的,多线程环境下频繁创建、销毁线程会影响系统性能
  2. 对于一个需要频繁创建任务、线程的应用来说,创建的任务数、线程数需要受到控制或管理

有了线程池,尤其是类似ThreadPoolExecutor这种可以通过参数调整其行为的线程池,可以近乎完美的解决上述两个问题。

线程池工作原理

简单来说线程池的工作原理就是:提前或者在执行任务的时候创建线程,执行完任务之后不销毁线程而是将线程归还到线程池中,后续有任务提交上来之后就可以不再创建线程、而是由线程池中空闲的线程执行任务。

这样一来就可以避免频繁创建和销毁线程,并且也可以控制线程池中线程的数量,同时如果提交任务的速度太快、线程池中的线程来不及执行任务的话,可以将任务放在队列中等待,等前面的任务执行完成、线程归还到线程池中之后,再从队列中获取任务继续执行。

其实以上就是ThreadPoolExecutor功能的简单描述。

当然ThreadPoolExecutor的功能要比这个描述强大的多也复杂的多。我们就从以下几个方面来详细分析一下ThreadPoolExecutor的功能和底层原理:

  1. 基本属性
  2. 任务队列
  3. 创建线程池
  4. 提交任务
  5. 执行任务
  6. 拒绝任务
  7. 钩子函数

基本属性

corePoolSize&maximumPoolSize:ThreadPoolExecutor有核心线程数(corePoolSize)和最大线程数(maximumPoolSize)的概念,新任务提交后,如果当前线程数小于corePoolSize,即使线程池中有空闲线程,ThreadPoolExecutor也会立即创建一个线程去执行任务。如果当前线程数大于corePoolSize且小于maximumPoolSize,则只有队列满的情况下才会创建线程、否则任务入队列排队。

ctl:绑定了状态runState和线程数workerCount两个属性的AtomicInteger变量:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

理解ctl的工作原理是读懂ThreadPoolExecutor源码的必要前提。

先看两个辅助变量:

 private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

COUNT_BITS是Integer.size-3=31-3=29,CAPACITY是1向左位移29位后减1,用二进制表示就是:

0001 1111 1111 1111 1111 1111 1111 1111

~CAPACITY用二进制表示就是:

1110 0000 0000 0000 0000 0000 0000 0000

// runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

 // Packing and unpacking ctl
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }

runState有RUNNING、SHUTDOWN、STOP、TIDYING和TERMINATED等5个状态,压入ctl表示的时候全部要左移29位。意思是:ctl是按照2进制位来表达含义的,高位的3位用来表示状态runstate,低位的29位用来表示线程数workerCount。

ctl通过ctlOf函数(runState的实际状态值左移29位,移动到高3位后,和workerCount按位或操作)得到。

runStateOf函数:ctl和~CAPACITY进行按位与,~CAPACITY的二级制表示为:

1110 0000 0000 0000 0000 0000 0000 0000

按位与操作得到的就是ctl的高3位,对应的就是runState。

workerCountOf函数:ctl和CAPACITY进行按位与,CAPACITY的二进制表示为:

0001 1111 1111 1111 1111 1111 1111 1111

按位与得到的就是低29位,对应的就是workerCount。

Keep-alive:如果当前线程数超过了corePoolSize,那么超出的线程如果空闲时间超过了keep-alive会被回收(terminate)。核心线程是没有超时概念也不会被回收的,但是可以通过设置allowCoreThreadTimeOut为true,使得核心线程也受到参数Keep-alive控制从而被回收。

任务队列

创建线程池的时候,通过ThreadPoolExutor构造方法指定任务队列,可以支持任何BlockingQueue。通过Executors工具创建ThreadPoolExutor的话,支持SynchronousQueue、LinkedBlockingQueue和 ArrayBlockingQueue三种阻塞队列。

任务队列是ThreadPoolExutor的重要参数,与corePoolSize和MaxPoolSize配合使用会创建出表现完全不同的线程池:有界还是无界队列会影响到线程池接收任务的能力或表现,FIFO还是LIFO会影响到任务执行顺序,等等。

创建线程池

ThreadPoolExecutor提供了4个构造方法,但是如果你不打算替换默认的ThreadFactory和RejectedExecutionHandler的话,最常用的构造方法其实只有一个:

 public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

调用到另外一个构造方法:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

从构造方法中我们可以看到,ThreadPoolExecutor创建后除了设置重要属性之外啥也没干,核心线程也并没有启动!

提交任务

默认情况下,ThreadPoolExecutor提交任务的过程也同时是创建线程的过程,因为缺省情况下ThreadPoolExecutor创建的时候并不创建线程。

image.png

ThreadPoolExecutor实现了Executor接口,Executor通过其唯一方法execute来提交任务。

ThreadPoolExecutor的execute方法接收一个Runable参数作为任务,按照如下逻辑完成任务的提交:

  1. 如果线程池的线程数量小于corePoolSize,则通过addWorker(command,true)创建并启动一个新线程来执行任务
  2. 否则,尝试将任务加入队列,如果成功,再次检查线程池状态,如果线程池已经停止运行则任务出队,拒绝任务。再次检查如果当前线程数为0则调用addWorker(null,false)创建新线程
  3. 否则,超出核心线程数且队列满,如果尚未超出最大线程数则通过addWorker(command,false)创建新线程,否则超出最大线程数,拒绝任务

源码比较简单,需要注意的是加入队列部分:

if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }

调用阻塞队列的非阻塞方法offer入队,想想为啥不用阻塞方法?

线程启动时机

默认情况下ThreadPoolExecutor创建之后不会启动任何线程,包括核心线程。所有线程都是在任务提交后启动。

可以通过调用prestartCoreThread()或prestartAllCoreThreads()随时启动一个或所有核心线程。

启动线程#addWorker

线程通过addWorker(Runnable firstTask, boolean core)方法启动。firstTask是该线程的第一个任务,firstTask=null表示只启动线程、无任务。core表示要启动的是核心线程、还是普通线程,用来判断线程数是否已达上限。

addWorker方法首先做必要的合法性判断:当前线程池状态,线程数是否已达上限等,满足启动条件则更新当前线程数WorkerCount。

然后创建线程对象Worker,获得锁,加锁操作:Worker加入workers缓存(worker存储当前线程池的所有尚未执行任务的线程)......操作完成之后,解锁,并启动线程

Worker对象

Worker是实现了Runnable接口的内部类,主要属性:

  1. thread:线程池的线程本尊
  2. firstTask: 线程创建时绑定的任务,该线程如果是任务提交的时候创建的,firstTask就是被提交的任务,如果线程创建成功,则firstTask具有优先执行权
  3. completedTasks:当前线程完成执行的任务数

初始化方法通过线程工厂创建一个线程:

 Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

this作为Runnable参数传递给Thread的构造方法,线程启动的时候就回调this的run方法,所以Worker的run方法就是线程池中的线程执行任务的入口方法。

执行任务Worker.run

run方法调用了runWorker方法:

  public void run() {
            runWorker(this);
        }

继续跟踪runWorker方法:

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            //有限执行firstTask,firstTask为空的话通过getTask()从队列中获取task
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    //调用任务执行前钩子函数
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        //执行任务
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        //调用任务执行后钩子函数
                        afterExecute(task, thrown);
                    }
                } finally {
                    //释放任务
                    task = null;
                    //当前线程执行任务数更新 +1
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            //线程池相关数据更新
            processWorkerExit(w, completedAbruptly);
        }
    }

从源码可以看到,Worker的firstTask会得到当前线程的优先执行,因为代码中获取并执行任务的循环条件中的task的初始值就是Worker的firstTask:

while (task != null || (task = getTask()) != null) {

firstTask执行完成之后,释放任务(task=null)。线程继续运行,下次循环时会通过getTask方法从队列获取任务。这个动作相当于:线程执行完一个任务之后并没有结束或销毁,而是交还给线程池,通过getTask继续从队列领任务,领到任务后继续执行。

getTask方法

getTask方法从队列获取排队等待执行的任务。

private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            //如果当前线程池已停止,或者处于SHUTDOWN状态且队列为空则返回null
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }
            //线程数
            int wc = workerCountOf(c);

            // Are workers subject to culling?
            //是否需要限定时间
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            //如果超过最大线程数且队列空,或等待超时且(线程数>1或者队列空),返回null
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                //如果需要限时,则通过限时参数从阻塞队列获取任务,否则不需要限时的话,阻塞直到获取到任务
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                //从队列拿到任务就返回该任务
                if (r != null)
                    return r;
                //否则没有拿到任务,设置超时
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

我们发现线程池的keepAliveTime参数就是在这个getTask方法中生效的。

如果线程数小于核心线程数,并且allowCoreThreadTimeOut设置为false的话,线程不受等待任务时长的限制,则采用阻塞队列的take方法、无限期等待直到可以从队列中获取任务。

如果线程数大于核心线程数,或者参数设置核心线程也需要受到超时控制,就会设置获取任务的限时时长为keepAliveTime,如果在keepAliveTime时间范围内仍然没有从阻塞队列中拿到任务,则返回null。

超过keepAliveTime时长没拿到任务将导致在runWorker方法的while循环满足结束条件而退出循环:

while (task != null || (task = getTask()) != null) {
     //获取任务...
    } finally {
            processWorkerExit(w, completedAbruptly);
        }

退出循环后,调用processWorkerExit方法结束线程、退出线程池。

keepAliveTime=0L的情况

如果设置keepAliveTime=0L,并且线程数超出核心线程数,会是什么情况?

下面这一行代码交代的清清楚楚了:

Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();

调用poll方法、等待时间为0去阻塞队列获取任务,可以去看一下阻塞队列的限定等待时长的poll方法的源码,等待时长为0则效果等同于非阻塞方法:获取不到数据立刻返回null。

所以keepAliveTime=0L表示:超出核心线程数后,在执行完任务之后允许空闲时间为0!即:如果没有新的任务提交上来的话,只保留corePoolSize个线程继续留在线程池等待任务,其他线程立即销毁、退出线程池。

拒绝任务

采用有界阻塞队列的线程池,在队列已满、且超出最大线程数后提交上来的任务会被拒绝,拒绝后的处理方式由ThreadPoolExecutor的拒绝策略RejectedExecutionHandler决定。拒绝策略在ThreadPoolExecutor创建时指定。主要包括:

  1. AbortPolicy:直接抛异常RejectedExecutionException
  2. CallerRunsPolicy:调用方处理,即交给提交execute方法的线程自己执行任务
  3. DiscardPolicy:直接扔掉该任务
  4. DiscardOldestPolicy:扔掉队列中等待时间最久的任务,执行当前任务
  5. 自定义:实现RejectedExecutionHandler接口,自定义拒绝策略

钩子函数

任务执行前和任务执行后分别调用beforeExecute/afterExecute方法,这两个方法在ThreadPoolExecutor中默认都是哑实现,什么都没做。如果你既想要采用ThreadPoolExecutor作为线程池、又想在任务执行前后做额外的动作,可以继承ThreadPoolExecutor并覆盖他的beforeExecute/afterExecute方法。

线程池ThreadPoolExecutor源码分析完成!

Thanks a lot!

作者:|福|,原文链接: https://segmentfault.com/a/1190000043526490

文章推荐

Github Copilot Chat 初体验

SpringBoot 使用 Sa-Token 完成注解鉴权功能

Java设计模式-建造者模式

一文讲透 RocketMQ 消费者是如何负载均衡的

Java GenericObjectPool 对象池化技术--SpringBoot sftp 连...

深入理解 python 虚拟机:令人拍案叫绝的字节码设计

XSS(Cross Site Scripting)跨站脚本攻击

Next.js 实践:从 SSR 到 CSR 的优雅降级

记一次Elasticsearch GeoIpDownloader的启动

K8S 性能优化 - K8S APIServer 调优

关于TornadoFx和Android的全局配置工具类封装实现及思路解析

Spring Security 中的权限注解很神奇吗?