由浅入深了解Java进程池及线程池的怎样使用版权声明

原创
小哥 3年前 (2022-11-11) 阅读数 9 #大杂烩

前言

多线程异步执行可以最大化多核计算机的计算能力,但如果不加以控制,将给系统带来负担。线程本身也会占用内存空间,大量线程会占用内存资源并导致Out of Memory。即使没有这种情况,大量的线程回收也会给GC带来很大的压力。

为了避免重复创建线程,线程池的存在允许线程被重用。用外行的话说,当有工作要做时,将从线程池中取出一个线程。当工作完成时,线程将返回到线程池供其他任务使用,而不是直接关闭线程。

接下来,从整体到细节,一起探索线程池。

整体架构

来看Executor框架图:

接口:Executor,CompletionService,ExecutorService,ScheduledExecutorService

抽象类:AbstractExecutorService

实现类:ExecutorCompletionService,ThreadPoolExecutor,ScheduledThreadPoolExecutor

主要方法可以从图中看出,本文主要讨论了这一点。ThreadPoolExecutor

研读ThreadPoolExecutor

看看这个类的构造函数:

public ThreadPoolExecutor(int paramInt1, int paramInt2, long paramLong, TimeUnit paramTimeUnit,
        BlockingQueue paramBlockingQueue, ThreadFactory paramThreadFactory,
        RejectedExecutionHandler paramRejectedExecutionHandler) {
    this.ctl = new AtomicInteger(ctlOf(-536870912, 0));
    this.mainLock = new ReentrantLock();
    this.workers = new HashSet();
    this.termination = this.mainLock.newCondition();
    if ((paramInt1 < 0) || (paramInt2 <= 0) || (paramInt2 < paramInt1) || (paramLong < 0L))
        throw new IllegalArgumentException();
    if ((paramBlockingQueue == null) || (paramThreadFactory == null) || (paramRejectedExecutionHandler == null))
        throw new NullPointerException();
    this.corePoolSize = paramInt1;
    this.maximumPoolSize = paramInt2;
    this.workQueue = paramBlockingQueue;
    this.keepAliveTime = paramTimeUnit.toNanos(paramLong);
    this.threadFactory = paramThreadFactory;
    this.handler = paramRejectedExecutionHandler;
}

corePoolSize :线程池的核心池大小。创建线程池后,默认情况下线程池没有任何线程。

当任务结束时,它将创建一个线程来执行任务。换句话说,创建线程池后,线程池中的线程数为0当任务结束时,将创建一个线程来执行,直到达到线程数。corePoolSize 之后,到达的任务被放置在队列中。(注意是到达的任务)。换句话说,更精细:corePoolSize 表示线程池中允许同时运行的最大线程数。

如果执行线程池。prestartAllCoreThreads()方法,线程池会提前创建并启动所有核心线程。

maximumPoolSize :线程池允许的最大线程数,表示可以创建的最大线程数量。maximumPoolSize它必须大于或等于corePoolSize。

keepAliveTime :指示没有任务时线程将持续多长时间,然后停止。默认情况下,只有线程池中的线程数大于corePoolSize 时,keepAliveTime 会起作用的。换句话说,当线程池中的线程数较大时corePoolSize达到线程空闲时间。keepAliveTime,那么就是shutdown。

Unit:keepAliveTime 的单位。

workQueue :一个阻塞队列,当线程池中的线程数超过其corePoolSize当,线程进入阻塞队列以阻塞等待。通过workQueue,线程池实现阻塞功能

threadFactory :线程工厂,用于创建线程。

handler :指示任务被拒绝时的策略。

任务缓存队列

我们之前多次提到任务缓存队列,例如。workQueue,用于存储等待执行的任务。

workQueue的类型为BlockingQueue,您通常可以选择以下三种类型:

1)有界任务队列ArrayBlockingQueue:基于数组的先进先出队列,必须以指定的大小创建;

2)未绑定的任务队列LinkedBlockingQueue:基于列表的先进先出队列,如果在创建时未指定此队列大小,则默认为。Integer.MAX_VALUE;

3)直接提交队列synchronousQueue:此队列很特殊。它不会保存提交的任务,而是直接创建一个新线程来执行新任务。

拒绝策略

AbortPolicy:丢弃任务和投掷RejectedExecutionException

CallerRunsPolicy:只要线程池未关闭,策略就会直接在调用线程中运行当前丢弃的任务。显然,这不会真正丢弃任务,但任务提交线程的性能可能会急剧下降。

DiscardOldestPolicy:放弃队列中最旧的请求,即要执行的任务,然后再次尝试提交当前任务。

DiscardPolicy:放弃任务而不进行任何处理。

线程池的任务处理策略:

如果当前线程池中的线程数较少corePoolSize,将创建一个线程来执行每个任务;

如果当前线程池中的线程数>=corePoolSize,则每来一个任务,会尝试将其添加到任务缓存队列当中,若添加成功,则该任务会等待空闲线程将其取出去执行;若添加失败(一般来说是任务缓存队列已满),则会尝试创建新的线程去执行这个任务;如果当前线程池中的线程数达到maximumPoolSize,将采用任务拒绝策略进行处理;

如果线程池中的线程数大于 corePoolSize如果线程空闲时间更长keepAliveTime,线程将被终止,直到线程池中的线程数不大于corePoolSize; 如果允许设置核心池中线程的生存时间,那么核心池中的线程将空闲更长时间keepAliveTime,线程也被终止。

线程池关闭

ThreadPoolExecutor提供了两个方法,用于线程池关闭,分别是shutdown()和shutdownNow(),其中:

shutdown():线程池不会立即终止,但在执行任务缓存队列中的所有任务之前不会终止,但不会再次接受新任务。

shutdownNow():立即终止线程池并尝试中断正在执行的任务,清除任务缓存队列并返回尚未执行的任务。

源码分析

首先,让我们看看核心execute方法,位于AbstractExecutorService未在中实现,来自Executor界面ThreadPoolExecutor只有这样,改变方法才得以实现,

ExecutorService中的submit(),invokeAll(),invokeAny()被称为execute方法,所以execute是核心的核心,源代码分析将围绕它逐步发展。

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldnt, by returning false.
     * 如果运行的线程数较少corePoolSize,然后将呼叫。addWorker 方法创建新线程并将该任务作为新线程的第一个任务执行。

       当然,原子性质检查是在创建线程之前完成的,如果条件不允许,则不会创建线程来执行任务并返回。false.  

     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     * 如果一个任务成功地进入阻塞队列,那么我们需要执行一次双重检查,以确保我们添加了一个线程(因为有一些线程自上次检查以来已经死亡)或

       当我们进入方法时,线程池已经关闭。因此,我们将重新检查状态,在线程池关闭时回滚到队列中,并在线程池中没有线程时创建一个新线程。

    1. If we cannot queue task, then we try to add a new
  • thread. If it fails, we know we are shut down or saturated
  • and so reject the task.        如果任务无法排队(队列已满),那么我们将尝试打开一个新线程(从corepoolsize到扩充到maximum),如果失败,则可以确定原因,或者        线程池已关闭或饱和(达到maximum),所以我们执行拒绝策略。

     */

             // 1.当前线程数较少corePoolSize,创建并启动线程。 int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true))         // 成功,返回

return; c = ctl.get(); }     // 2.步骤1如果失败,尝试进入阻塞队列, if (isRunning(c) && workQueue.offer(command)) {        // 成功进入队列,检查线程池状态,以及状态是否已部署RUNNING而且remove成功,任务被拒绝 int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command);        // 如果当前worker数量为0,通过addWorker(null, false)创建任务为null else if (workerCountOf(recheck) == 0) addWorker(null, false); }     // 3. 步骤1和2如果失败,请尝试拥有线程池的数量。corePoolSize扩充至maxPoolSize,如果失败,则拒绝该任务 else if (!addWorker(command, false)) reject(command); }

我相信阅读代码也是一种困惑,接下来,让我们谈谈他用流程图做了什么:

结合以上流程图逐行解析,首先是空指针检查,

wonrkerCountOf()该方法可以获得当前线程池中的线程总数,并将当前线程数与核心池大小进行比较,

  • 如果小于,将通过。addWorker()方法调度执行。
  • 如果它大于核心池大小,则将其提交到等待队列。
  • 如果进入等待队列失败,任务将直接提交到线程池。
  • 如果达到最大线程数,则提交失败并执行拒绝策略。

excute()向方法中添加任务的方法是使用addWorker()方法,查看源代码并一起学习。

private boolean addWorker(Runnable firstTask, boolean core) { retry:      // 外部循环用于确定线程池状态。 for (;;) { int c = ctl.get(); int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

       // 内层的循环,任务是worker数量加1 for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } }     // worker加1之后,下一个woker添加到HashSet然后开始。worker boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { final ReentrantLock mainLock = this.mainLock; w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int c = ctl.get(); int rs = runStateOf(c);

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }

         // 如果往HashSet<Worker>如果添加成功,则启动线程。 if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }

addWorker(Runnable firstTask, boolean core)主要任务是创建和启动线程。

他将基于当前线程的状态和给定值(core or maximum)以确定是否可以创建线程。

addWorker有四种传输方法。execute分别使用了其中三种:

1.addWorker(paramRunnable, true)

线程数较少corePoolSize什么时候,需要交易。task进Workers Set。如果Workers Set长度超过corePoolSize,就返回false.

2.addWorker(null, false)

放入空的task进workers Set,长度限制为maximumPoolSize。这样的task为空的worker当线程执行时,它将进入任务队列接受任务,这相当于创建一个新线程,但不会立即分配任务。

3.addWorker(paramRunnable, false)

当队列已满时,请尝试获取此新队列。task直接放入Workers Set,而此时Workers Set长度限制为maximumPoolSize。如果线程池已满,则返回。false.

事实也是如此execute()未使用该方法。

addWorker(null, true)

这个方法是null的task进Workers Set,且小于corePoolSize如果此时Set数字corePoolSize那就返回false,什么都不做。在实际使用中prestartAllCoreThreads()方法,用于预启动线程池。corePoolSize个worker等待从workQueue获取任务执行。

执行过程:

1,确定线程池当前是否可用于添加worker线程的状态,可以继续下一步,而不是return false:
A,线程池状态>shutdown,可能为stop、tidying、terminated,无法添加worker线程
B,线程池状态==shutdown,firstTask不为空,无法添加worker线程,因为shutdown状态的线程池不接收新任务。
C,线程池状态==shutdown,firstTask==null,workQueue为空,无法添加worker线程,因为firstTask空是指添加一个没有任务的线程,然后从。workQueue获取task,而workQueue为  空,表示添加无任务线程不再有意义。
2,线程池中的当前线程数是否超过上限(corePoolSize 或 maximumPoolSize),超过return false,如果不超过workerCount+1,继续下一步
3,在线程池中。ReentrantLock在保证下,Workers Set添加新创建的worker实例,添加后解锁,然后启动worker线程,如果这一切成功,return true,如果您添加worker入Set失败或启动失败,请致电addWorkerFailed()逻辑

四个通用线程池

newFixedThreadPool

public static ExecutorService newFixedThreadPool(int var0) { return new ThreadPoolExecutor(var0, var0, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); }

public static ExecutorService newFixedThreadPool(int var0, ThreadFactory var1) { return new ThreadPoolExecutor(var0, var0, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), var1); }

固定大小的线程池,您可以指定线程池、线程池的大小。corePoolSize和maximumPoolSize使用相等的阻塞队列。LinkedBlockingQueue,大小为整数最大值。

线程池中的线程数始终相同。提交新任务时,线程池中的空闲线程将立即执行。如果没有,它们将临时存储在阻塞队列中。对于固定大小的线程池,线程数没有变化。使用无界时LinkedBlockingQueue存储执行的任务。当非常频繁地提交任务时,LinkedBlockingQueue

快速增长,存在耗尽系统资源的问题。而且,当线程池空闲时,即当线程池中没有可运行的任务时,它不会释放工作线程,也会占用一定数量的系统资源,这是必需的。shutdown。

newSingleThreadExecutor

public static ExecutorService newSingleThreadExecutor() { return new Executors.FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue())); }

public static ExecutorService newSingleThreadExecutor(ThreadFactory var0) {
    return new Executors.FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), var0));
}

单线程线程池,只有一个线程的线程池,使用阻塞队列LinkedBlockingQueue,如果任何额外的任务被提交到线程池,它们将被临时存储在阻塞队列中,并在空闲时执行。按照先进先出的顺序执行任务。

newCachedThreadPool

public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, 2147483647, 60L, TimeUnit.SECONDS, new SynchronousQueue()); }

public static ExecutorService newCachedThreadPool(ThreadFactory var0) {
    return new ThreadPoolExecutor(0, 2147483647, 60L, TimeUnit.SECONDS, new SynchronousQueue(), var0);
}

缓存线程池,默认情况下缓存线程存活60秒。线程的核心池corePoolSize大小为0,核心池是最大的。Integer.MAX_VALUE,阻塞队列使用SynchronousQueue是直接提交的阻塞队列,    他总是强制线程池添加新线程以执行新任务。当没有任务要执行时,当线程的空闲时间超过keepAliveTime(60秒),工作线程将终止被回收,当提交新任务时,如果没有空闲线程,创建一个新线程来执行任务将导致一些开销。如果同时提交大量任务,并且任务执行时间不是特别快,那么线程池将添加等量的线程池处理任务,这很可能会很快耗尽系统资源。

newScheduledThreadPool

public static ScheduledExecutorService newScheduledThreadPool(int var0) { return new ScheduledThreadPoolExecutor(var0); }

public static ScheduledExecutorService newScheduledThreadPool(int var0, ThreadFactory var1) {
    return new ScheduledThreadPoolExecutor(var0, var1);
}

定时线程池,可用于定期执行任务,通常用于定期同步数据。

scheduleAtFixedRate:任务以固定的频率执行,周期是每个任务成功执行之间的间隔。

schedultWithFixedDelay:任务以固定延迟执行,这是指上次执行成功后到下一次执行开始前的时间。

使用实例

newFixedThreadPool实例:

View Code

newCachedThreadPool实例:

View Code

这里没有电话。shutDown方法,可在此处找到。60秒后,资源将自动释放。

newSingleThreadExecutor

View Code

这里需要注意的一点是,newSingleThreadExecutor和newFixedThreadPool类似地,当线程池中没有任务时,它不会释放系统资源,因此是必需的。shudown。

newScheduledThreadPool

View Code

最后杂谈

如何选择线程池的数量

线程池的大小决定了系统的性能,过大或过小的线程池数量无法实现最佳的系统性能。

当然,线程池的大小不需要太精确,只需要避免过大和过小的情况。通常,需要考虑确定线程池的大小。CPU数量、内存大小以及任务是计算密集型还是IO密集型和其他因素

NCPU = CPU的数量

UCPU = 期望对CPU的使用率 0 ≤ UCPU ≤ 1

W/C = 等待时间与计算时间之比

如果希望处理器达到所需的使用率,线程池的最佳大小为:

*线程池大小=NCPU UCPU(1+W/C)**

在Java中使用

int ncpus = Runtime.getRuntime().availableProcessors();

获取CPU的数量。

线程池工厂

Executors如果未指定线程工厂,将使用线程池。Executors中的DefaultThreadFactory,默认线程池工厂创建的线程是非守护进程线程。

使用自定义线程工厂可以做很多事情,例如跟踪线程池何时创建的线程数,或者自定义线程名称和优先级。如果

所有新创建的线程都设置为守护进程线程。当主线程退出时,线程池将被强制销毁。

下面的示例记录了线程的创建,并将所有线程设置为守护进程线程。

View Code

扩展线程池

ThreadPoolExecutor是可扩展的,它提供了几个可以在子类中重写的方法:beforeExecute,afterExecute和terimated。

将在执行任务的线程中调用。beforeExecute和afterExecute,这些方法还可以添加日志记录、计时、监视或统计收集,

它还可以用于输出有用的调试信息,以帮助系统诊断故障。下面是一个扩展线程池的示例:

View Code

正确使用线程池。

阿里编码规范中的以下段落:

不允许使用线程池。Executors创建,但ThreadPoolExecutor这样,处理方法使写作学生更清楚线程池的运行规则,避免了资源耗尽的风险。 说明:Executors每种方法的缺点:
1)newFixedThreadPool和newSingleThreadExecutor:
主要问题是,堆叠的请求处理队列可能会消耗大量内存,甚至OOM。
2)newCachedThreadPool和newScheduledThreadPool:
主要问题是最大线程数为Integer.MAX_VALUE,可能会创建大量线程,甚至OOM。

手动创建线程池时需要注意几点。

  1. 任务独立性。 任务如何依赖于其他任务,那么可能会出现死锁。例如,如果一个任务等待另一个任务的返回值或执行结果,除非线程池足够大,否则将发生线程饥饿死锁。

  2. 合理配置阻止时间过长的任务。 如果任务被阻塞太久,即使没有死锁,线程池的性能也会变得非常差。在里面Java并发包中的阻塞方法都定义了时间限制模式和时间限制模式。例如

Thread.join,BlockingQueue.put,CountDownLatch.await依此类推,如果任务超时,则确定任务失败,然后中止任务或将任务放回队列以供后续执行,这样,无论任务的最终结果是否成功,该方法都可以确保任务始终可以继续执行。

  1. 设置合理的线程池大小。 只需要避免太大或太小的情况。上述公式 *线程池大小=NCPU UCPU(1+W/C)** 。

  2. 选择适当的阻塞队列。 newFixedThreadPool和newSingleThreadExecutor所有这些都使用无限阻塞队列。无限阻塞队列消耗大量内存。如果使用有界阻塞队列,将避免内存过度使用的问题。然而,当任务填满有界阻塞队列时,新任务应该做什么?使用有界队列时,需要选择适当的拒绝策略,并且必须同时调整队列大小和线程池大小。对于非常大或无限的线程池,可以使用SynchronousQueue为了避免将任务排队以直接从生产者向工作线程提交任务。

下面是Thrift框架处理socket任务使用的线程池,您可以查看它。FaceBook工程师如何自定义线程池。

private static ExecutorService createDefaultExecutorService(Args args) {
    SynchronousQueue executorQueue = new SynchronousQueue();

    return new ThreadPoolExecutor(args.minWorkerThreads, args.maxWorkerThreads, 60L, TimeUnit.SECONDS,
            executorQueue);
}

Copyright ©2019 Janti

版权声明

所有资源都来源于爬虫采集,如有侵权请联系我们,我们将立即删除