深入Java线程池

@[toc]

一、什么是线程池


线程池就是创建若干个可执行的线程放入一个池(容器)中,有任务需要处理时,会提交到线程池中的任务队列,处理完之后线程并不会被销毁,而是仍然在线程池中等待下一个任务。

Java中的线程池是运用场景最多的并发框架,几乎所有需要异步或并发执行任务的程序都可以使用线程池。在开发过程中,合理地使用线程池能够带来以下下好处

  1. 降低资源消耗 ,通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  2. 提高响应速度 ,当任务到达时,任务可以不需要等到线程创建就能立即执行。
  3. 提高线程的可管理性,线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控

二、Executor 框架


我们知道线程池就是线程的集合,下提供了集中管理、线程重用、降低资源消耗、提高响应速度等 从 JDK 1.5之后。为了把工作单元与执行机制分开,Executor 框架诞生了,他是一个用于统一创建与运行的接口。Executor 框架实现的就是线程池的功能。Executor 框架不仅包括了线程池的管理,还提供了线程工厂、队列以及拒绝策略等,Executor 框架让并发编程变得更加简单。


2.1 Executor 框架组成



1. 任务 (Runnable / Callable)

执行任务需要实现 Runnable 接口或者 Callable 接口

2. 任务等执行 (Executor)

包括任务执行机制的核心接口Executor,以及继承自Executor的 ExecutorService 接口。Executor框架有两个关键类实现了ExecutorService接口:ThreadPoolExecutor 和 ScheduledThreadPoolExecutor


	ExecutorService executorService = Executors.newFixedThreadPool(5);
	

我们实现线程池通过 Executors 实现 ExecutorService 接口,ThreadPoolExecutor 和 ScheduledThreadPoolExecutor 这两个关键类实现了 ExecutorService 接口

ThreadPoolExecutor 类描述 :


	//AbstractExecutorService实现了ExecutorService接口
	public class ThreadPoolExecutor extends AbstractExecutorService 

	public abstract class AbstractExecutorService implements ExecutorService 
	

ScheduledThreadPoolExecutor 类描述:


	//继承ThreadPoolExecutor
	public class ScheduledThreadPoolExecutor
	        extends ThreadPoolExecutor
	        implements ScheduledExecutorService
	        

3. 异步计算的结果 (Future)

包括Future和实现Future接口的FutureTask类。


2.2 Executor 结构


在这里插入图片描述

  • Executor :是一个接口,他是Executor框架的基础,它将任务的提交与任务的执行分离。
  • ThreadPoolExecutor :是线程池的核心实现类,用来执行被提交的任务。
  • ScheduledThreadPoolExcecutor 是一个实现类,可以在给定等延迟后运行命令,或者定期执行命令,ScheduledThreadPoolExcecutoe比 Timer 更灵活,功能更强大
  • Future Future接口和它的实现FutureTask类,代表异步计算的结果。
  • ExecutorService :是一个线程池的实现

2.2 Executor 使用

  1. 祝线程首先要创建实现 Runnable 或者 Callable 接口的任务对象
	public static void main(String[] args) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("当前线程 :" + Thread.currentThread().getName());
            }
        }).start();
    }
  1. 把创建完成实现 Runnable 或者 Callable 接口对象直接交给 ExecutorService 执行 ExecutorService.execute(Runnable command))或者也可以把 Runnable 对象或Callable 对象提交给 ExecutorService 执行 ExecutorService.execute(Runnable command))或 ExecutorService 执行 ExecutorService.submit(Callable task))
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        executorService.execute(new Runnable() {
            @Override
            public void run() {
                System.out.println("当前线程:" + Thread.currentThread().getName());
            }
        });
        Future<?> submit = executorService.submit(new Runnable() {
            @Override
            public void run() {
                System.out.println("当前线程:" + Thread.currentThread().getName());
            }
        });
  
    }
  1. 如果 执行 ExecutorService.submit(…),将会返回一个 Future<?> 对象
  2. 祝线程可以执行 Future.get() 方法来等待任务执行完成 也可以执行 FutureTask.cancel(boolean mayInterruptIfRunning)来取消此任务的执行。

三、ThreadPoolExecutor 解析


3.1 构造方法


	public ThreadPoolExecutor(int corePoolSize, //核心线程数
	                          int maximumPoolSize, //最大线程数
	                          long keepAliveTime, //线程数大于核心时,多余线程存活时间
	                          TimeUnit unit, //时间单位
	                          BlockingQueue<Runnable> workQueue, //工作队列
	                          ThreadFactory threadFactory, //线程工厂
	                          RejectedExecutionHandler handler //拒绝策略 ,线程过多的处理
	                          ) {
	        if (corePoolSize < 0 ||
	            maximumPoolSize <= 0 ||
	            maximumPoolSize < corePoolSize ||
	            keepAliveTime < 0)
	            throw new IllegalArgumentException();
	        if (workQueue == null || threadFactory == null || handler == null)
	            throw new NullPointerException();
	        this.acc = System.getSecurityManager() == null ?
	                null :
	                AccessController.getContext();
	        this.corePoolSize = corePoolSize;
	        this.maximumPoolSize = maximumPoolSize;
	        this.workQueue = workQueue;
	        this.keepAliveTime = unit.toNanos(keepAliveTime);
	        this.threadFactory = threadFactory;
	        this.handler = handler;
	    }

参数详解 :

  • corePoolSize :核心池的大小。 当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中
  • maximumPoolSize :当队列中存放的任务达到队列容量的时候,当前可以同时运行的线程数量变为最大线程数
  • keepAliveTime :当线程池中的线程数量大于 corePoolSize 的时候,如果这时候没有新的任务提交,核心线程外的线程不会立刻销毁,而是等待 时间超过来 keepAliveTime 才会被销毁
  • unitkeepAliveTime 参数的时间单位。
  • workQueue : 工作队列 , 当新任务来的时候会先判断当前运行的线程数量是否达到核心心线程数,如果达到的话,新任务就会被存放在队列中
  • threadFactory : 线程工厂,用来创建线程
  • handler :拒绝策略,提交的任务过多而不能及时处理时,我们可以定制策略来处理任务

在这里插入图片描述

为什么要这么设计呢 有了最大线程数,为什么要设计核心池大小呢?

  1. 如果当前线程池中的线程数 < corePoolSize 则每来一个任务,就会超级爱你一个线程去执行这个任务
  2. 如果当前线程池中的线程数 >= corePoolSize , 则每来一个任务,会将其添加到工作队列中,若添加成功,则等待 核心线程空闲将其取出执行,若添加失败 (一般队列已满)则在总数 不大于 maximumPoolSize 的前提下,创建新的线程
  3. 如果当前线程池中的线程数达到 maximumPoolSize ,则会才用拒绝策略进行处理
  4. 补充 : 如果当前线程池的数量大于 corePoolSize 时,如果某个线程空闲时间超过keepAliveTime ,线程将被销毁,直至线程池中的线程数目不大于 corePoolSize

ThreadPoolExecutor 拒绝策略

  • ThreadPoolExecutor.AbortPolicy :抛出 RejectedExecutionException来拒绝新任务的处理
  • ThreadPoolExecutor.CallerRunsPolicy :调用执行自己的线程运行任务,也就是直接在调用 execute 方法的线程运行(run)被拒绝的任务,如果执行程序已关闭,则会丢弃该任务。因此这种策略会降低对于新任务的提交速度,影响程序的整体性能。
  • ThreadPoolExecutor.DiscardPolicy :不处理新任务,直接丢弃掉。
  • ThreadPoolExecutor.DiscardOldestPolicy :此策略将丢弃最早的未处理的任务请求

3.2 自定义 ThreadPoolExecutor


在 《阿里巴巴 JAVA 开发手册》明确指出线程资源利用线程池,线程池不允许使用 Executors 去创建

【强制】 线程资源必须通过线程池提供,不允许在应用中自行显式创建线程。 说明:线程池的好处是减少在创建和销毁线程上所消耗的时间以及系统资源的开销,解决资源不足的问题。 如果不使用线程池,有可能造成系统创建大量同类线程而导致消耗完内存或者“过度切换”的问题。

【强制】 线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这 样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。 说明:Executors 返回的线程池对象的弊端如下:

  • =1) FixedThreadPool 和 SingleThreadPool: 允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。 >- 2) CachedThreadPool: 允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM。
public class test {


    private static final int CORE_POOL_SIZE = 5;
    private static final int MAX_POOL_SIZE = 10;
    private static final int QUEUE_CAPACITY = 20;
    private static final Long KEEP_ALIVE_TIME = 1L;


    public static void main(String[] args) {

        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                CORE_POOL_SIZE,
                MAX_POOL_SIZE,
                KEEP_ALIVE_TIME,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(QUEUE_CAPACITY),
                new ThreadPoolExecutor.AbortPolicy()
        );
        for (int i = 0; i < 300; i++) {
            threadPoolExecutor.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println("当前线程:"+ Thread.currentThread().getName());
                    System.out.println("当前状态:"+ Thread.currentThread().getState());
                }
            });
        }
        //终止线程池
        threadPoolExecutor.shutdown();
    }



}

在这里插入图片描述 因为我们使用的 ThreadPoolExecutor.AbortPolicy 的拒绝任务,所以被抛出异常

OOM 案例:

public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(100000);
        System.out.println("开始执行");
        for (int i = 0; i < 100000000; i++) {
            executorService.execute(() -> {
                String payload = IntStream.rangeClosed(1, 1000000)
                        .mapToObj(__ -> "a") .collect(Collectors.joining("")) + UUID.randomUUID().toString();
                System.out.println("等待一小时开始");
                try {
                    TimeUnit.HOURS.sleep(1);
                }catch (Exception e){
                    log.info(payload);
                }
            });
        }
        executorService.shutdown();
        executorService.awaitTermination(1,TimeUnit.HOURS);
    }

在这里插入图片描述


3.3 源码分析


线程池状态:利用低29位表示线程池中线程数,通过高3位表示线程池的运行状态:

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;
  • RUNNING:运行状态,接受新的任务并且处理队列中的任务。-
  • SHUTDOWN:关闭状态(调用了 shutdown 方法)。不接受新任务,,但是要处理队列 中的任务。
  • STOP:停止状态(调用了 shutdownNow 方法)。不接受新任务,也不处理队列中的 任务,并且要中断正在处理的任务。
  • TIDYING:所有的任务都已终止了,workerCount 为 0,线程池进入该状态后会调terminated() 方法进入 TERMINATED 状态。
  • TERMINATED:终止状态,terminated() 方法调用结束后的状态。
//记录线程池中线程状态
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

//来获取当前线程数量
private static int workerCountOf(int c)  { return c & CAPACITY; }

//工作队列
private final BlockingQueue<Runnable> workQueue;



public void execute(Runnable command) {
		//如果任务为null,则抛出异常
        if (command == null)
            throw new NullPointerException();
        // 取的是记录线程状态
        int c = ctl.get();
        //判断当前线程池中之行的任务数量是否小于 corePoolSize
        if (workerCountOf(c) < corePoolSize) {
        //小于的话,通过addWorker(command, true)新建一个线程,并将任务(command)添加到该线程中
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        //如果当前之行的任务数量大于等于 corePoolSize 的时候就会走到这里
        if (isRunning(c) && workQueue.offer(command)) {
        	// 再次获取线程池状态,如果线程池状态不是 RUNNING 状态就需要从任务队列中移除任务
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            // 如果当前线程池为空就新创建一个线程并执行。
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        // 通过addWorker(command, false)新建一个线程
        // 如果addWorker(command, false)执行失败,则通过reject()执行相应的拒绝策略的内容。
        else if (!addWorker(command, false))
            reject(command);
    }

在这里插入图片描述

addWorker() 方法



// Lock锁
private final ReentrantLock mainLock = new ReentrantLock();

// 跟踪线程池的最大大小
private int largestPoolSize;

// 工作线程集合
private final HashSet<Worker> workers = new HashSet<>();

//获取线程池状态
private static int runStateOf(int c)     { return c & ~CAPACITY; }
    
//判断线程池的状态是否为 Running
private static boolean isRunning(int c) {
      return c < SHUTDOWN;
}

private boolean addWorker(Runnable firstTask, boolean core) {
		// CAS更新线程池数量
        retry:
        for (;;) {
			//获取线程池状态
            int c = ctl.get();
            int rs = runStateOf(c);

            // 检查queue是否为空
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
            	//获取线程池中线程的数量
                int wc = workerCountOf(c);
                // core参数为true的话表明队列也满了,线程池大小变为 maximumPoolSize 
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // //原子操作将workcount的数量加1
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                // 如果线程的状态改变了就再次执行上述操作
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
            }
        }
		// 标记工作线程是否启动成功
        boolean workerStarted = false;
        // 标记工作线程是否创建成功
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
            	// 加锁
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());
					// rs < SHUTDOWN 如果线程池状态依然为RUNNING,并且线程的状态是存活的话,就会将工作线程添加到工作线程集合中
					// 或者 rs == SHUTDOWN 传入的firstTask == null 添加新的worker
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        // 更新当前工作线程的最大容量
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                	// 释放锁
                    mainLock.unlock();
                }
                // 如果成功添加工作线程,则调用Worker内部的线程实例t的Thread#start()方法启动真实的线程实例
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            // 失败移除
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

runWorker() 方法

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            // 首先会通过run方法执行firstTask,执行完毕后会将task置为null
            // 那么task!=null的判断条件肯定不通过,它就会尝试通过getTask(),从任务队列中获取任务。
            while (task != null || (task = getTask()) != null) {
            	// 每一次任务的执行都必须获取锁来保证下方临界区代码的线程安全
                w.lock();
   				//如果状态值大于等于STOP(状态值是有序的,即STOP、TIDYING、TERMINATED)且当前线程还没有被中断,则主动中断线程
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                	//执行任务前处理操作,默认是一个空实现;在子类中可以通过重写来改变任务执行前的处理行为
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                    	// 任务后处理
                        afterExecute(task, thrown);
                    }
                } finally {
                	//将task 变为null,已处理完成
                    task = null;
                    // ++操作,已完成任务数
                    w.completedTasks++;
                    //解锁
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
        	// 销毁当前的worker对象,并完成一些诸如完成任务数量统计之类的辅助性工作
            // 在线程池当前状态小于STOP的情况下会创建一个新的worker来替换被销毁的worker
            processWorkerExit(w, completedAbruptly);
        }
    }

getTask() 方法

private Runnable getTask() {
		// 通过timeOut变量表示线程是否空闲时间超时了
        boolean timedOut = false; // Did the last poll() time out?

		// 死循环
        for (;;) {
        	// 获取线程池状态
            int c = ctl.get();
            int rs = runStateOf(c);

            // 如果 线程池状态>=STOP 则直接减少一个worker计数并返回null
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }
			//  获取线程池中的worker计数
            int wc = workerCountOf(c);

            // 判断当前线程是否会被超时销毁
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

			// 如果当前线程数大于最大线程数 或者超时 或者阻塞队列为空 减少worker计数并返回null
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
            	//从阻塞队列中取出一个任务(如果队列为空会进入阻塞等待状态)
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                 //	如果线程不等于null,直接返回
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

四、常用线程池


4.1 FixedThreadPool

  • FixedThreadPool : 定长线程池
public class Test {


    public static void main(String[] args) throws InterruptedException {

        ExecutorService executorService = Executors.newFixedThreadPool(3);
        for (int i = 1; i < 5; i++) {
            executorService.execute(()->{
                System.out.println("当前线程"+ Thread.currentThread().getName());
            });
        }
        executorService.shutdown();

    }
}

当前线程pool-1-thread-1 当前线程pool-1-thread-3 当前线程pool-1-thread-2 当前线程pool-1-thread-1

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

从上面源代码可以看出新创建的 FixedThreadPool 的 corePoolSize 和 maximumPoolSize 都被设置为 nThreads, 固定长度

在这里插入图片描述 说明:

  1. 如果当前运行的线程数小于 corePoolSize, 如果再来新任务的话,就创建新的线程来执行任务
  2. 当前运行的线程数等于 corePoolSize 后, 如果再来新任务的话,会将任务加入 LinkedBlockingQueue
  3. 线程池中的线程执行完 手头的任务后,会在循环中反复从 LinkedBlockingQueue 中获取任务来执行

弊端 :

FixedThreadPool 线程池使用的是 LinkedBlockingQueue 无界队列 ,(队列的容量为 Intger.MAX_VALUE),在线程任务多的情况下会导致 OOM


4.2 SingleThreadExecutor


  • SingleThreadExecutor : 一个单线程化的线程池
public class Test {


    public static void main(String[] args) throws InterruptedException {

        ExecutorService executorService = Executors.newSingleThreadExecutor();
        for (int i = 1; i < 5; i++) {
            executorService.execute(()->{
                System.out.println("当前线程"+ Thread.currentThread().getName());
            });
        }
        executorService.shutdown();

    }
}

当前线程pool-1-thread-1 当前线程pool-1-thread-3 当前线程pool-1-thread-2 当前线程pool-1-thread-1

public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
           (new ThreadPoolExecutor(1, 1,
                                   0L, TimeUnit.MILLISECONDS,
                                   new LinkedBlockingQueue<Runnable>()));
    }

从上面源代码可以看出新创建的 SingleThreadExecutor 的 corePoolSize 和 maximumPoolSize 都被设置为 1, 固定长度,其他参数和 FixedThreadPool 相同。

在这里插入图片描述 说明:

  1. 如果当前运行的线程数少于 corePoolSize,则创建一个新的线程执行任务
  2. 当前线程池中有一个运行的线程后,将任务加入 LinkedBlockingQueue
  3. 线程执行完当前的任务后,会在循环中反复从LinkedBlockingQueue 中获取任务来执行

弊端 :

SingleThreadExecutor 线程池使用的是 LinkedBlockingQueue 无界队列 ,(队列的容量为 Intger.MAX_VALUE),在线程任务多的情况下会导致 OOM


4.3 CachedThreadPool

  • CachedThreadPool : 缓存线程池
public class Test {


    public static void main(String[] args) throws InterruptedException {

        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 1; i < 5; i++) {
            executorService.execute(()->{
                System.out.println("当前线程"+ Thread.currentThread().getName());
            });
        }
        executorService.shutdown();

    }
}

当前线程pool-1-thread-1 当前线程pool-1-thread-3 当前线程pool-1-thread-2 当前线程pool-1-thread-1

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

CachedThreadPool 的 corePoolSize 被设置为空(0),maximumPoolSize被设置为 Integer.MAX.VALUE,即它是无界的,这也就意味着如果主线程提交任务的速度高于 maximumPool 中线程处理任务的速度时,CachedThreadPool 会不断创建新的线程。极端情况下,这样会导致耗尽 cpu 和内存资源。

在这里插入图片描述

说明:

  1. 首先 SynchronousQueue 是一个生产消费模式等阻塞任务队列,只要有任务就需要有线程执行,线程池中等线程可以重复使用

弊端 :

CachedThreadPool 允许创建的线程数量为 Integer.MAX_VALUE ,可能会创建大量线程,从而导致 OOM。


4.3 ScheduledThreadPoolExecutor

  • ScheduledThreadPoolExecutor : 延迟线程池
public class Test {


    public static void main(String[] args) throws InterruptedException {

        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
        for (int i = 1; i < 5; i++) {
            scheduledExecutorService.schedule(()->{
                System.out.println("当前线程"+ Thread.currentThread().getName());
                System.out.println("时间"+LocalDateTime.now());
            },3,TimeUnit.SECONDS);
        }
        scheduledExecutorService.shutdown();

    }
}

当前线程pool-1-thread-1 时间2021-04-14T00:11:12.125 当前线程pool-1-thread-1 时间2021-04-14T00:11:12.126 当前线程pool-1-thread-1 时间2021-04-14T00:11:12.126 当前线程pool-1-thread-1 时间2021-04-14T00:11:12.126

public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }

ScheduledThreadPoolExecutor 主要用来在给定的延迟后运行任务,或者定期执行任务

ScheduledThreadPoolExecutor 使用的任务队列 DelayQueue 封装了一个 PriorityQueue,PriorityQueue 会对队列中的任务进行排序,执行所需时间短的放在前面先被执行(ScheduledFutureTask 的 time 变量小的先执行),如果执行所需时间相同则先提交的任务将被先执行(ScheduledFutureTask 的 squenceNumber 变量小的先执行)。

在这里插入图片描述

说明:

  1. 延迟定时任务线程池,有点像我们的定时任务。同样,它也是一个无限大小的线程池 ,Integer.MAX_VALUE。它提供的调用方法比较多,包括:scheduleAtFixedRate、scheduleWithFixedDelay,可以按需选择延迟执行方式。



个人博客地址:http://blog.yanxiaolong.cn   | 『纵有疾风起,人生不言弃』

end
  • 作者:yxl(联系作者)
  • 发表时间:2021-04-14 14:01
  • 版权声明:自由转载-非商用-非衍生-保持署名(创意共享3.0许可证)
  • 转载声明:如果是转载栈主转载的文章,请附上原文链接
  • 公众号转载:请在文末添加作者公众号二维码(公众号二维码见右边,欢迎关注)
  • 评论



    联系我
    Blog信息
    我的Blog已营业: (*๓´╰╯`๓)
    备案号:蒙ICP备20002688号