Java进程池原理和队列讲解转载

原创
小哥 3年前 (2022-10-28) 阅读数 58 #大杂烩

请注明转载来源: http://blog.csdn.net/xx326664162/article/details/51701508 文章来源: 薛璇的博客

你也可以查看我的其他类似文章,你也会有一定的收据!

线程池框架图:

一、ThreadPoolExecutor线程池实现类

ThreadPoolExecutor是线程池的核心类。首先看一下如何创建一个ThreadPoolExecutor。下面是ThreadPoolExecutor一种常见的施工方法:

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue)
  • 1

看一下这个类中定义的重要变量,如下所示:

private final BlockingQueue workQueue;              // 阻塞队列  
private final ReentrantLock mainLock = new ReentrantLock();   // 互斥锁  
private final HashSet workers = new HashSet();// 线程集合.一个Worker对应于一个线程  
private final Condition termination = mainLock.newCondition();// 终止条件  
private int largestPoolSize;           // 线程池中曾经达到的最大线程数。  
private long completedTaskCount;       // 已完成的任务数  
private volatile ThreadFactory threadFactory;     // ThreadFactory对象,用于创建线程。  
private volatile RejectedExecutionHandler handler;// 拒绝策略的处理句柄。  
private volatile long keepAliveTime;   // 线程池维护线程允许的空闲时间。  
private volatile boolean allowCoreThreadTimeOut;  
private volatile int corePoolSize;     // 线程池维护最小数量的线程,即使它是空闲的。  
private volatile int maximumPoolSize;  // 线程池维护的最大线程数。  
  • 有几条重要的规则需要解释:

1、 corePoolSize、workQueue 、maximumPoolSize的关系

1当线程池刚刚创建时,其中没有一个线程。任务队列作为参数传入。但是,即使队列中有任务,线程池也不会立即执行它们。

2、当调用 execute() 当该方法添加任务时,线程池做出如下判断:

a. 如果正在运行的线程数量较少 corePoolSize,然后立即创建一个线程来运行此任务;

b. 如果正在运行的线程数大于或等于 corePoolSize,然后将此任务放入队列中。

c. 如果此时队列已满并且正在运行的线程数较少 maximumPoolSize,然后仍然希望创建一个线程来运行此任务;

d. 如果队列已满并且正在运行的线程数大于或等于 maximumPoolSize,则线程池抛出异常,告诉调用方“我不能再接受该任务”。

3当线程完成一项任务时,它会从队列中取出下一个任务来执行。

4当线程无事可做时,超过一定的时间(keepAliveTime),线程池确定当前运行的线程数量是否大于 corePoolSize,则此线程停止。因此,在线程池的所有任务完成后,它最终会缩小 corePoolSize 的大小。

这个过程表明,没有必要先添加任务,然后再执行它们。假设队列大小为 10,corePoolSize 为 3,maximumPoolSize 为 6,然后在加入时 20 执行顺序如下:首先执行任务。 1、2、3,然后是任务 4~13 被放入队列中。此时队列已满,任务 14、15、16 将立即执行,并且该任务 17~20 抛出一个异常。最终的顺序是:1、2、3、14、15、16、4、5、6、7、8、9、10、11、12、13。下面是一个使用线程池的示例:

2、workQueue 线程池使用的缓冲区队列。

缓冲区队列的长度决定了可以缓冲的最大缓冲区数。 三大共同战略

1、 直接提交。

工作队列的默认选项为 SynchronousQueue,它将任务直接提交给线程,而不保留它们。
在这里,如果没有可用于立即运行任务的线程,则将任务添加到队列将失败,因此将构造一个新线程。
此策略在处理可能具有内部依赖关系的请求集时避免锁定。直接提交通常需要无约束 maximumPoolSizes 以避免拒绝新提交的任务。当命令连续到达的数量超过队列可以处理的平均数量时,此策略允许无限线程具有增长的潜力。;

2、 无界队列

使用无界队列(例如,没有预定义容量的队列)。 LinkedBlockingQueue)将导致所有 corePoolSize 当线程繁忙时,新任务将在队列中等待。这样,创建的线程数不会超过 corePoolSize。(因此,maximumPoolSize 的值也无效。)

当每个任务完全独立于其他任务时,也就是说,使用无界队列是合适的。任务执行互不影响; Web 在页面服务器中。此队列可用于处理瞬时突发请求,并且此策略允许在命令连续到达的平均值超过队列可以处理的范围时,无限线程增长。;

3、 有界队列

当使用受限的 maximumPoolSizes 当有界队列(例如, ArrayBlockingQueue)有助于防止资源枯竭,但可能更难调整和控制。

队列大小和最大池大小可能需要折衷:使用大队列和小池可以最小化 CPU 使用率、操作系统资源和上下文切换开销,但可能会导致手动吞吐量降低。如果任务频繁地被阻塞(例如,如果它们是 I/O 边界),则系统可能会调度超过您允许的线程数。使用小队列通常需要较大的池大小,CPU 高使用率,但可能会遇到不可接受的调度开销,这也会降低吞吐量。.

举例:

例1:使用直接提交策略,即E.SynchronousQueue。

首先SynchronousQueue是无限的,这意味着他存储任务的能力是无限的, 而是因为Queue它自身的特点,在添加了一定的元素后,必须等待其他线程拿走后才能继续添加。 这里要么是核心线程,要么是新创建的线程,但让我们想象一下下面的场景。

我们使用以下参数进行构造ThreadPoolExecutor:

new ThreadPoolExecutor(   
                2, 3, 30, TimeUnit.SECONDS,    
                new  SynchronousQueue(),    
                new RecorderThreadFactory("CookieRecorderPool"),    
                new ThreadPoolExecutor.CallerRunsPolicy()); 

假设当前核心线程已经具有2正在运行.

  1. 此时,一项任务(A),根据前述“如果正在运行的线程等于或大于 corePoolSize,则 Executor 总是倾向于在不添加新线程的情况下对请求进行排队。,所以A被添加到queue中。

  2. 另一项任务(B)和核心2这根线还没有穿好。下一步先试一试。1中所述,但由于使用SynchronousQueue,所以一定不能加入。

  3. 至此,上面提到的“如果无法将请求添加到队列中,则创建一个新线程”就满足了,因此不可避免地会创建一个新线程来运行该任务。

  4. 但如果这三项任务都没有完成,则继续执行一项任务,queue无法插入(任务A还在queueIn),而线程数达到maximumPoolSize,所以我们必须执行例外政策。
    为了避免这种情况: ,所以在使用中。SynchronousQueue通常要求maximumPoolSize是无界的(如果您想限制,则直接使用有界队列)。以供使用SynchronousQueue的作用jdk它清楚地写在: 此策略在处理可能具有内部依赖关系的请求集时避免锁定。

如果您的任务是A1,A2有内在的联系,A1需要先运行,然后提交A1,再提交A2,当使用SynchronousQueue我们可以保证,A1必须首先在中执行。A1在它被执行之前,A2无法添加queue中

例2:使用无界队列策略,即E。LinkedBlockingQueue

拿newFixedThreadPool可以说,根据前面提到的规则:

  • 如果运行的线程较少 corePoolSize,则 Executor 在不排队的情况下添加新线程总是首选的。

那么,当任务持续增加时会发生什么呢?

  • 如果正在运行的线程等于或大于 corePoolSize,则 Executor 总是喜欢在不添加新线程的情况下对请求进行排队。

OK此时,该任务已加入队列。什么时候会添加新的线程?

  • 如果请求无法排队,则会创建一个新线程,除非在此线程之后创建该线程。 maximumPoolSize在这种情况下,该任务将被拒绝。

无界队列加入队列不会失败。 不像SynchronousQueue这有其自身的特点,对于无界队列,它总是可以添加的(当然,资源耗尽是另一回事)。

换句话说,它永远不会触发新的线程!线程的数量一直是corePoolSize大小。在忙于当前线程之后,从队列中取出任务并开始运行。如果任务长时间运行,添加任务的速度远远超过处理任务的速度,任务队列就会疯狂增长,任务内存很快就会爆炸。

示例3:有界队列,使用ArrayBlockingQueue。

这是最复杂的用法,所以JDK还有一些不建议这样做的原因。与上述相比,最大的特点是可以防止资源枯竭。

例如,请看以下构造方法:

new ThreadPoolExecutor(  
            2, 4, 30, TimeUnit.SECONDS,   
            new ArrayBlockingQueue(2),   
            new RecorderThreadFactory("CookieRecorderPool"),   
            new ThreadPoolExecutor.CallerRunsPolicy());  

假设所有的任务都永远不会完成。

  1. 首先来的A,B直接运行
  2. 如果来了C,D,他们将被放在queue中
  3. 如果你下次再来E,F,然后增加要运行的线程。E,F。最大线程数为4
  4. 但是,如果有另一个任务,队列不能再接受它,线程数量已经达到最大限制,所以将使用拒绝策略来处理它。

3、ThreadFactory

使用 ThreadFactory 创建一个新线程。如果未另行说明,在相同的 ThreadGroup 全部使用。 Executors.defaultThreadFactory() 创建线程,并且这些线程具有相同的 NORM_PRIORITY 优先级非守护程序状态

通过提供不同的 ThreadFactory,您可以更改线程的名称、线程组、优先级、守护进程状态等。如果来自 newThread 返回 null 时 ThreadFactory 如果无法创建线程,则执行器将继续运行,但不能执行任何任务。

public interface ThreadFactory {  
    Thread newThread(Runnable r);  
}

而在施工方法上threadFactory对象,则是通过 Executors.defaultThreadFactory()返回的。Executors.java中的defaultThreadFactory()源代码如下:

public static ThreadFactory defaultThreadFactory() {  
     return new DefaultThreadFactory();  
 } 

在DefaultThreadFactory类实现ThreadFactory接口并实现了其中定义的方法,如下所示:

static class DefaultThreadFactory implements ThreadFactory {  
    private static final AtomicInteger poolNumber = new AtomicInteger(1);  
    private final ThreadGroup group;  
    private final AtomicInteger threadNumber = new AtomicInteger(1);  
    private final String namePrefix;  

    DefaultThreadFactory() {  
        SecurityManager s = System.getSecurityManager();  
        group = (s != null) ? s.getThreadGroup() :  Thread.currentThread().getThreadGroup();  
        namePrefix = "pool-" +  poolNumber.getAndIncrement() +  "-thread-";  
    }  
    // 为线程池创建新的任务执行线程。  
    public Thread newThread(Runnable r) {  
        // 与该线程对应的任务包括Runnable对象r  
        Thread t = new Thread(group, r,namePrefix + threadNumber.getAndIncrement(), 0);  
        // 设置为非后台进程线程  
        if (t.isDaemon())  
            t.setDaemon(false);  
        // 设置优先级Thread.NORM_PRIORITY  
        if (t.getPriority() != Thread.NORM_PRIORITY)  
            t.setPriority(Thread.NORM_PRIORITY);  
        return t;  
    }  
}  

4、RejectedExecutionHandler

当Executor已关闭(即已执行executorService.shutdown()方法之后),以及Executor当有限边界用于最大线程和工作队列容量且饱和时,该方法execute()中提交的新任务将被拒绝.
在上述情况下,execute 方法将调用其 RejectedExecutionHandler 的 RejectedExecutionHandler.rejectedExecution(java.lang.Runnable, java.util.concurrent.ThreadPoolExecutor) 方法。下面提供了四个预定义的处理程序策略:

  1. 在默认的 ThreadPoolExecutor.AbortPolicy 被拒绝的处理程序将引发运行库。 RejectedExecutionException;

  2. 在 ThreadPoolExecutor.CallerRunsPolicy线程调用 execute 它本身。该策略提供了一种简单的反馈控制机制,可以减缓新任务的提交速度。

  3. 在 ThreadPoolExecutor.DiscardPolicy 无法执行的任务将被删除;

  4. 在 ThreadPoolExecutor.DiscardOldestPolicy 如果执行器尚未关闭,则将删除工作队列最前面的任务,并重试执行器(如果再次失败,请重复此过程)。
    线程池将默认defaultHandler战略。第一眼defaultHandler的定义:

    private static final RejectedExecutionHandler defaultHandler = new AbortPolicy(); // 使用默认拒绝策略

  • 1

    public static class AbortPolicy implements RejectedExecutionHandler {
    public AbortPolicy() { }
    // 抛出异常 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString());
    }
    }

看看其他拒绝策略的具体实施。

class MyRunnable implements Runnable {  
    private String name;  
    public MyRunnable(String name) {  
        this.name = name;  
    }  
    @Override  
    public void run() {  
        try {  
            System.out.println(this.name + " is running.");  
            Thread.sleep(100);  
        } catch (Exception e) {  
            e.printStackTrace();  
        }  
    }  
}

举例:

如上所述,这是一个测试任务的示例,请写在下面。4要测试的测试用例。

1. DiscardPolicy 示例

public class DiscardPolicyDemo {  

    private static final int THREADS_SIZE = 1;  
    private static final int CAPACITY = 1;  

    public static void main(String[] args) throws Exception {  

        // 创建一个线程池。线程池。"最大池大小"和"核心池大小"都为1(THREADS_SIZE),"线程池"阻塞队列容量为1(CAPACITY)。  
        ThreadPoolExecutor pool = new ThreadPoolExecutor(THREADS_SIZE, THREADS_SIZE, 0, TimeUnit.SECONDS, new ArrayBlockingQueue(CAPACITY));  
        // 设置线程池的拒绝策略。"丢弃"  
        pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());  

        // 新建10任务,并将它们添加到线程池中。  
        for (int i = 0; i < 10; i++) {  
            Runnable myrun = new MyRunnable("task-"+i);  
            pool.execute(myrun);  
        }  
        // 关闭线程池  
        pool.shutdown();  
    }  
}  

线程池pool最大池大小“和”核心池大小“都是1(THREADS_SIZE)这意味着线程池可以同时运行的最大任务数只能是1”。
线程池pool阻塞队列是 ArrayBlockingQueue ,ArrayBlockingQueue是有界阻塞队列,ArrayBlockingQueue的容量为1。这也意味着线程池阻塞队列只能有一个线程池阻塞等待。
根据《金融时报》中的分析,execute()代码显示线程池正在运行。2任务。不是的。1任务直接进入Worker在,通过线程来执行;2任务被放置在阻塞队列中等待。其他任务已被丢弃!

2. DiscardOldestPolicy 示例

public class DiscardOldestPolicyDemo {  

    private static final int THREADS_SIZE = 1;  
    private static final int CAPACITY = 1;  

    public static void main(String[] args) throws Exception {  

        // 创建一个线程池。线程池。"最大池大小"和"核心池大小"都为1(THREADS_SIZE),"线程池"阻塞队列容量为1(CAPACITY)。  
        ThreadPoolExecutor pool = new ThreadPoolExecutor(THREADS_SIZE, THREADS_SIZE, 0, TimeUnit.SECONDS,  
                new ArrayBlockingQueue(CAPACITY));  
        // 设置线程池的拒绝策略。"DiscardOldestPolicy"  
        pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());  

        // 新建10任务,并将它们添加到线程池中。  
        for (int i = 0; i < 10; i++) {  
            Runnable myrun = new MyRunnable("task-"+i);  
            pool.execute(myrun);  
        }  
        // 关闭线程池  
        pool.shutdown();  
    }  
}  

手术结果:

task-0 is running.
task-9 is running.

制定了“线程池拒绝策略”。DiscardPolicy修改为DiscardOldestPolicy之后,当任务被添加到线程池并被拒绝时,线程池在阻塞队列的末尾丢弃该任务,然后将被拒绝的任务添加到末尾。

3. AbortPolicy 示例

public class AbortPolicyDemo {  

    private static final int THREADS_SIZE = 1;  
    private static final int CAPACITY = 1;  

    public static void main(String[] args) throws Exception {  

        // 创建一个线程池。线程池。"最大池大小"和"核心池大小"都为1(THREADS_SIZE),"线程池"阻塞队列容量为1(CAPACITY)。  
        ThreadPoolExecutor pool = new ThreadPoolExecutor(THREADS_SIZE, THREADS_SIZE, 0, TimeUnit.SECONDS,  
                new ArrayBlockingQueue(CAPACITY));  
        // 设置线程池的拒绝策略。"抛出异常"  
        pool.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());  

        try {  

            // 新建10任务,并将它们添加到线程池中。  
            for (int i = 0; i < 10; i++) {  
                Runnable myrun = new MyRunnable("task-"+i);  
                pool.execute(myrun);  
            }  
        } catch (RejectedExecutionException e) {  
            e.printStackTrace();  
            // 关闭线程池  
            pool.shutdown();  
        }  
    }  
}  

(某一次)手术结果:

java.util.concurrent.RejectedExecutionException
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:1774)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:768)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:656)
    at AbortPolicyDemo.main(AbortPolicyDemo.java:27)
task-0 is running.
task-1 is running.

制定了“线程池拒绝策略”。DiscardPolicy修改为AbortPolicy在此之后,当任务被添加到线程池时被拒绝时,将引发该任务。RejectedExecutionException。

4. CallerRunsPolicy 示例

public class CallerRunsPolicyDemo {  

    private static final int THREADS_SIZE = 1;  
    private static final int CAPACITY = 1;  

    public static void main(String[] args) throws Exception {  

        // 创建一个线程池。线程池。"最大池大小"和"核心池大小"都为1(THREADS_SIZE),"线程池"阻塞队列容量为1(CAPACITY)。  
        ThreadPoolExecutor pool = new ThreadPoolExecutor(THREADS_SIZE, THREADS_SIZE, 0, TimeUnit.SECONDS,  
                new ArrayBlockingQueue(CAPACITY));  
        // 设置线程池的拒绝策略。"CallerRunsPolicy"  
        pool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());  

        // 新建10任务,并将它们添加到线程池中。  
        for (int i = 0; i < 10; i++) {  
            Runnable myrun = new MyRunnable("task-"+i);  
            pool.execute(myrun);  
        }  

        // 关闭线程池  
        pool.shutdown();  
    }  
}  

(某一次)手术结果:

task-2 is running.
task-3 is running.
task-4 is running.
task-5 is running.
task-6 is running.
task-7 is running.
task-8 is running.
task-9 is running.
task-0 is running.
task-1 is running.

制定了“线程池拒绝策略”。DiscardPolicy修改为CallerRunsPolicy之后,当任务被添加到线程池并被拒绝时,线程池会将被拒绝的任务添加到“线程池运行线程”中运行。

二、Executor任务提交界面

Executor框架同java.util.concurrent.Executor 接口在Java 5被引入了。Executor框架是基于一组执行策略调用、调度、执行和控制异步任务的框架。Executor存在的目的是提供一种 “任务提交”和“任务如何运行” 不同的机制。定义如下:

public interface Executor {  
    void execute(Runnable command);  
} 

虽然只有一种方法,但它为灵活而强大的异步任务执行框架提供了基础。它提供了一种将任务的提交过程与执行过程分离并使用它的标准方法。Runnable代表这项任务。

三、ExecutorService任务周期管理界面

Executor的实现通常会创建线程来执行任务,但当使用异步方法执行任务时,由于之前提交的任务的状态不是立即可见的,如果要关闭应用程序,需要将受影响的任务状态返回给应用程序。

为了解决线程生命周期问题,EecutorService扩展了Eecutor接口,增加了一些生命周期管理的方法。具体情况如下:

public interface ExecutorService extends Executor {  
    void shutdown();  
    List shutdownNow();  
    boolean isShutdown();  
    boolean isTerminated();  
    boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;  
    // 省略部分方法  
}

四、Executors工具类

那么我们怎么才能Executor物体在哪里?这就是接下来要介绍的内容。Exectors类了。
Executors为创建Executor,ExecutorService,ScheduledExecutorService,ThreadFactory和Callable类提供了一些方法,类似于集合中的方法。Collections类的功能。

下面是Exectors用于创建线程池的类的一些方法:

1、newCachedThreadPool

它是一个线程池,具有无限数量的线程,并且只有非核心线程。

此线程池更适合于无需固定大小即可快速完成的小任务。它将为每个任务创建一个线程。

这样就可以直接创建线程对象(new Thread()有什么关系呢?请参见其第三个参数60L第四个参数TimeUnit.SECONDS你有没有?优点是60能够在几秒钟内重复使用已创建的线程。完毕60s回收空闲线程。

Integer.MAX_VALUE 是一个非常大的数字,它实际上等于可以任意大的最大线程数。

下面是Executors中的newCachedThreadPool()源代码:

public static ExecutorService newCachedThreadPool() {  
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue());  
    } 

2、 newFixedThreadPool

它是一个具有固定数量的线程的线程池。如果提交的任务数大于限制的最大线程数,则这些任务将排队。当一个线程的任务结束时,它们将继续按照调度策略等待下一个任务执行。

下面是Executors中的newFixedThreadPool()源代码:

public static ExecutorService newFixedThreadPool(int nThreads) {  
      return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue());  
  } 

3、newSingleThreadExecutor

线程数为1的FixedThreadPool,如果提交了多个任务,则它们将被排队,每个任务将在下一个任务开始之前运行和结束,并且所有任务将使用相同的线程。下面是Executors中的newSingleThreadExecutor()源代码:

public static ExecutorService newSingleThreadExecutor() {  
        return new FinalizableDelegatedExecutorService  
            (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()));  
    } 

通过为上面的线程池创建源代码,我们可以发现:
1> 除了CachedThreadPool使用用于直接提交策略的缓冲队列,FixedThreadPool和SingleThreadExecutor全部使用无界缓冲队列,创建的线程数不会超过 corePoolSize。
2> 使用了三个线程池。ThreadPoolExecutor构造方法是相同的,使用默认的ThreadFactory和handler:

private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();  

 public ThreadPoolExecutor(int corePoolSize,  
                     int maximumPoolSize,  
                     long keepAliveTime,  
                     TimeUnit unit,  
                     BlockingQueue workQueue) {  
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,  
        Executors.defaultThreadFactory(), defaultHandler);  
} 

也就是说,三个线程池创建的线程对象都在同一组中,优先级是正常的。Thread.NORM_PRIORITY(5)非守护程序线程,使用的拒绝任务处理方法是直接抛出异常。AbortPolicy战略(如前所述)。

4、newScheduledThreadPool

核心线程的数量是固定的,
非核心线程的数量没有限制,当非核心线程空闲时,它们会立即被回收。
主要用于执行定时任务和固定周期的重复性任务。

    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE,
              DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
              new DelayedWorkQueue());
    }

参考:
http://blog.csdn.net/mazhimazh/article/details/19243889
http://shift-alt-ctrl.iteye.com/blog/1840385
http://dongxuan.iteye.com/blog/901689
http://blog.csdn.net/sd0902/article/details/8395677
http://shift-alt-ctrl.iteye.com/blog/1840385
Java线程池的详细信息:ThreadPoolExecutor、Executors
引用 Java自包含线程池ThreadPoolExecutor详细说明和应用实例

版权声明

所有资源都来源于爬虫采集,如有侵权请联系我们,我们将立即删除

热门