ThreadPoolExecutor创建新线程的条件:
- coreThreadCount < 已创建线程数 < maxThreadCount, 或者 已创建线程数 < maxThreadCount 且 阻塞队列已满
因此如果 使用无界阻塞队列,那么线程数将只会达到 coreThreadCount(如果coreThreadCount == 0,那么会创建一个线程),而不会继续创建到 maxThreadCount.
例如:ThreadPoolExecutor pool = new ThreadPoolExecutor(0, 5, 60, TimeUnit.SECONDS, new LinkedBlockingQueue());事实上这个pool只会创建一个线程,这个pool事实上是串行执行的.
可以参见 ThreadPoolExecutor.execute(Runnable)方法:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command);}
java 1.6 以后可以如此解决:
// 注意,这里 coreThreadCount 设置为和 maxThreadCount 一样大ThreadPoolExecutor pool = new ThreadPoolExecutor(5, 5, 60, TimeUnit.SECONDS, new LinkedBlockingQueue());// 这个的作用是使得coreThread和非coreThread采用一样的idle超时destroy策略.pool.allowCoreThreadTimeOut(true);
注意: 这样做的特点是
- 提交一个task后,只要当前的 线程数(pool size)小于 core size,那么就会创建一个新的Worker Thread(原因参见上面列出的execute方法实现).
- 就上面的例子来看,假设目前线程池里面已有4个线程,那么再次提交一个任务后(execute或者submit),线程池都会创建一个新的Worker Thread,
即使那4个线程目前处于idle状态
.如果这不符合你的要求,可以采用下面的方案:java 1.5 及以前
- allowCoreThreadTimeOut(true) 的作用是当线程空闲时间超过设置的idle时间后,就会销毁该线程,即使当前线程数小于core size.因此空闲一段时间后,线程池中的线程将会减少到0.
java 1.5 及以前:
private static class SpecialBlockingQueueextends LinkedBlockingQueue { private ThreadPoolExecutor executor; public void setExecutor(ThreadPoolExecutor executor) { synchronized (this) { if(this.executor != null) { throw new IllegalStateException("You can only call setExecutor() once!"); } if(executor == null) { throw new NullPointerException("executor argument can't be null!"); } this.executor = executor; } } @Override public boolean offer(T t) { synchronized (this) { if(executor == null) { throw new NullPointerException("you must call setExecutor() before use it!"); } int poolSize = executor.getPoolSize(); int activeSize = executor.getActiveCount(); int maxSize = executor.getMaximumPoolSize(); if(activeSize == poolSize && poolSize < maxSize) { return false; // let ThreadPoolExecutor create a new Thread } } return super.offer(t); }}// 注意,这里 coreThreadCount 就可以直接设置为 0SpecialBlockingQueue specialBlockingQueue = new SpecialBlockingQueue<>();ThreadPoolExecutor tpe = new ThreadPoolExecutor(0, 5, 2, TimeUnit.SECONDS, specialBlockingQueue, new ThreadFactory() { public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setName("ThreadPoolExecutor-" + t.getId() + "-" + System.identityHashCode(t)); return t; }}, new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { try { executor.getQueue().put(r); } catch (InterruptedException e) { e.printStackTrace();// will never reach here } }});specialBlockingQueue.setExecutor(tpe);
备注
- 如果线程池使用比较频繁,那么可能会出现Thread刚被销毁就需要创建的情况,可以考虑使用
Executors.newFixedThreadPool()
- 如果希望线程池中的线程数可以降为0.(非 daemon 的Thread将会阻止JVM退出)那么可以考虑
上面讲的方案
或者Executors.newCachedThreadPool()