/** * Creates a new {@code ThreadPoolExecutor} with the given initial * parameters. * * @param corePoolSize the number of threads to keep in the pool, even * if they are idle, unless {@code allowCoreThreadTimeOut} is set //表示线程池的常驻核心数,如果设置为0,则表示在没有任何任务,销毁线程池;如果大于0,即使没有任务,也会保证线程池的线程数量等于此值。如果设置过小,频繁的创建和销毁,设置过大,浪费系统资源。 * @param maximumPoolSize the maximum number of threads to allow in the * pool 必须大于0,大于corePoolSize。此值只有在任务比较多,且不能存放在任务队列时,才能用到。 * @param keepAliveTime when the number of threads is greater than * the core, this is the maximum time that excess idle threads * will wait for new tasks before terminating. 表示线程的存活时间,当线程池空闲时间并且超过了此时间,多余的线程就会销毁到线程池的核心线程数的数量为止。 * @param unit the time unit for the {@code keepAliveTime} argument keepAliveTime的时间单位 * @param workQueue the queue to use for holding tasks before they are * executed. This queue will hold only the {@code Runnable} * tasks submitted by the {@code execute} method. 线程池的任务队列,线程池的所有线程都在处理任务的时候,新任务缓存到这里排队等到执行 * @param threadFactory the factory to use when the executor * creates a new thread 通常在创建线程池时不指定此参数,会使用默认的线程创建工厂的模式,创建线程。 * @param handler the handler to use when execution is blocked * because the thread bounds and queue capacities are reached 线程池的拒绝策略,如果workQueue存储满之后,不能创建新的线程来执行任务,就会使用拒绝策略,属于限流保护机制 * @throws IllegalArgumentException if one of the following holds: * {@code corePoolSize < 0} * {@code keepAliveTime < 0} * {@code maximumPoolSize <= 0} * {@code maximumPoolSize < corePoolSize} * @throws NullPointerException if {@code workQueue} * or {@code threadFactory} or {@code handler} is null */public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueueworkQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueueworkQueue) { // Executors.defaultThreadFactory() 为默认的线程创建工厂 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler);}public static ThreadFactory defaultThreadFactory() { return new DefaultThreadFactory();}// 默认的线程创建工厂,需要实现 ThreadFactory 接口static class DefaultThreadFactory implements ThreadFactory { private static final AtomicInteger poolNumber = new AtomicInteger(1); private final ThreadGroup group; private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-"; } // 创建线程 public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) t.setDaemon(false); // 创建一个非守护线程 if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); // 线程优先级设置为默认值 return t; }}
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); // 当前工作的线程数小于核心线程数 if (workerCountOf(c) < corePoolSize) { // 创建新的线程执行此任务 if (addWorker(command, true)) return; c = ctl.get(); } // 检查线程池是否处于运行状态,如果是则把任务添加到队列 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // 再出检查线程池是否处于运行状态,防止在第一次校验通过后线程池关闭 // 如果是非运行状态,则将刚加入队列的任务移除 if (! isRunning(recheck) && remove(command)) reject(command); // 如果线程池的线程数为 0 时(当 corePoolSize 设置为 0 时会发生) else if (workerCountOf(recheck) == 0) addWorker(null, false); // 新建线程执行任务 } // 核心线程都在忙且队列都已爆满,尝试新启动一个线程执行失败 else if (!addWorker(command, false)) // 执行拒绝策略 reject(command);}
addWorker(Runnable firstTask, boolean core)
ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 10, 10L, TimeUnit.SECONDS, new LinkedBlockingQueue(20));// execute 使用executor.execute(new Runnable() { @Override public void run() { System.out.println("Hello, execute."); }});// submit 使用Futurefuture = executor.submit(new Callable () { @Override public String call() throws Exception { System.out.println("Hello, submit."); return "Success"; }});System.out.println(future.get());
ThreadPoolExecutor executor=new ThreadPoolExecutor(1,3,10,TimeUnit.SECONDS,new LinkedBlockingQueue<>(2),new ThreadPoolExecutor.AbortPolicy()); for (int i = 0; i < 6; i++) { executor.execute(()->{ System.out.println(Thread.currentThread().getName()); }); }
pool-1-thread-2pool-1-thread-1pool-1-thread-3pool-1-thread-1pool-1-thread-3Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.example.tangtang.boot.launch.LeetCodeSolution.ThreadExample3$$Lambda$1/1496724653@48533e64 rejected from java.util.concurrent.ThreadPoolExecutor@64a294a6[Running, pool size = 3, active threads = 0, queued tasks = 0, completed tasks = 5] at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063) at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379) at com.example.tangtang.boot.launch.LeetCodeSolution.ThreadExample3.main(ThreadExample3.java:9)
ThreadPoolExecutor executor=new ThreadPoolExecutor(1, 3, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>(2), new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.println("这是自定义的拒绝策略"); } }); for (int i = 0; i <15; i++) { executor.execute(()->{ System.out.println(Thread.currentThread().getName()); }); }
public class ThreadPoolExtend { public static void main(String[] args) { MyThreadPoolExecutor executor=new MyThreadPoolExecutor(2,4,10, TimeUnit.SECONDS,new LinkedBlockingQueue()); for (int i = 0; i < 3; i++) { executor.execute(()->{ Thread.currentThread().getName(); }); } } static class MyThreadPoolExecutor extends ThreadPoolExecutor{ private final ThreadLocallocalTime = new ThreadLocal<>(); public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } @Override protected void beforeExecute(Thread t, Runnable r) { Long sTime = System.nanoTime();// 开始时间 (单位:纳秒) localTime.set(sTime); System.out.println(String.format("%s | before | time=%s",t.getName(),sTime)); super.beforeExecute(t,r); } @Override protected void afterExecute(Runnable r,Throwable t) { Long eTime = System.nanoTime();// 开始时间 (单位:纳秒) Long totalTime = eTime - localTime.get(); // 执行总时间 System.out.println(String.format("%s | after | time=%s | 耗时:%s 毫秒",Thread.currentThread().getName(),eTime,(totalTime/1000000.0))); super.afterExecute(r,t); } }
pool-1-thread-1 | before | time=38938852458000
pool-1-thread-2 | before | time=38938852540300 pool-1-thread-1 | after | time=38938882198300 | 耗时:29.7403 毫秒 pool-1-thread-2 | after | time=38938882205900 | 耗时:29.6656 毫秒 pool-1-thread-1 | before | time=38938883845000 pool-1-thread-1 | after | time=38938884124100 | 耗时:0.2791 毫秒