博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
ThreadPoolExecutor 创建Worker Thread的策略
阅读量:6826 次
发布时间:2019-06-26

本文共 4508 字,大约阅读时间需要 15 分钟。

hot3.png

ThreadPoolExecutor创建新线程的条件:

  • coreThreadCount < 已创建线程数 < maxThreadCount, 或者 已创建线程数 < maxThreadCount 且 阻塞队列已满

因此如果 使用无界阻塞队列,那么线程数将只会达到 coreThreadCount(如果coreThreadCount == 0,那么会创建一个线程),而不会继续创建到 maxThreadCount.

例如:ThreadPoolExecutor pool = new ThreadPoolExecutor(0, 5, 60, TimeUnit.SECONDS, new LinkedBlockingQueue
());事实上这个pool只会创建一个线程,这个pool事实上是串行执行的.

可以参见 ThreadPoolExecutor.execute(Runnable)方法:

public void execute(Runnable command) {    if (command == null)        throw new NullPointerException();    /*     * Proceed in 3 steps:     *     * 1. If fewer than corePoolSize threads are running, try to     * start a new thread with the given command as its first     * task.  The call to addWorker atomically checks runState and     * workerCount, and so prevents false alarms that would add     * threads when it shouldn't, by returning false.     *     * 2. If a task can be successfully queued, then we still need     * to double-check whether we should have added a thread     * (because existing ones died since last checking) or that     * the pool shut down since entry into this method. So we     * recheck state and if necessary roll back the enqueuing if     * stopped, or start a new thread if there are none.     *     * 3. If we cannot queue task, then we try to add a new     * thread.  If it fails, we know we are shut down or saturated     * and so reject the task.     */    int c = ctl.get();    if (workerCountOf(c) < corePoolSize) {        if (addWorker(command, true))            return;        c = ctl.get();    }    if (isRunning(c) && workQueue.offer(command)) {        int recheck = ctl.get();        if (! isRunning(recheck) && remove(command))            reject(command);        else if (workerCountOf(recheck) == 0)            addWorker(null, false);    }    else if (!addWorker(command, false))        reject(command);}

java 1.6 以后可以如此解决:

// 注意,这里 coreThreadCount 设置为和 maxThreadCount 一样大ThreadPoolExecutor pool = new ThreadPoolExecutor(5, 5, 60, TimeUnit.SECONDS, new LinkedBlockingQueue
());// 这个的作用是使得coreThread和非coreThread采用一样的idle超时destroy策略.pool.allowCoreThreadTimeOut(true);

注意: 这样做的特点是

  • 提交一个task后,只要当前的 线程数(pool size)小于 core size,那么就会创建一个新的Worker Thread(原因参见上面列出的execute方法实现).
  • 就上面的例子来看,假设目前线程池里面已有4个线程,那么再次提交一个任务后(execute或者submit),线程池都会创建一个新的Worker Thread,即使那4个线程目前处于idle状态.如果这不符合你的要求,可以采用下面的方案: java 1.5 及以前
  • allowCoreThreadTimeOut(true) 的作用是当线程空闲时间超过设置的idle时间后,就会销毁该线程,即使当前线程数小于core size.因此空闲一段时间后,线程池中的线程将会减少到0.

java 1.5 及以前:

private static class SpecialBlockingQueue
extends LinkedBlockingQueue
{ private ThreadPoolExecutor executor; public void setExecutor(ThreadPoolExecutor executor) { synchronized (this) { if(this.executor != null) { throw new IllegalStateException("You can only call setExecutor() once!"); } if(executor == null) { throw new NullPointerException("executor argument can't be null!"); } this.executor = executor; } } @Override public boolean offer(T t) { synchronized (this) { if(executor == null) { throw new NullPointerException("you must call setExecutor() before use it!"); } int poolSize = executor.getPoolSize(); int activeSize = executor.getActiveCount(); int maxSize = executor.getMaximumPoolSize(); if(activeSize == poolSize && poolSize < maxSize) { return false; // let ThreadPoolExecutor create a new Thread } } return super.offer(t); }}// 注意,这里 coreThreadCount 就可以直接设置为 0SpecialBlockingQueue
specialBlockingQueue = new SpecialBlockingQueue<>();ThreadPoolExecutor tpe = new ThreadPoolExecutor(0, 5, 2, TimeUnit.SECONDS, specialBlockingQueue, new ThreadFactory() { public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setName("ThreadPoolExecutor-" + t.getId() + "-" + System.identityHashCode(t)); return t; }}, new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { try { executor.getQueue().put(r); } catch (InterruptedException e) { e.printStackTrace();// will never reach here } }});specialBlockingQueue.setExecutor(tpe);

备注

  • 如果线程池使用比较频繁,那么可能会出现Thread刚被销毁就需要创建的情况,可以考虑使用Executors.newFixedThreadPool()
  • 如果希望线程池中的线程数可以降为0.(非 daemon 的Thread将会阻止JVM退出)那么可以考虑上面讲的方案或者Executors.newCachedThreadPool()

转载于:https://my.oschina.net/fengheju/blog/664848

你可能感兴趣的文章
Cookie&Session(会话技术)
查看>>
「BZOJ1010」[HNOI2008] 玩具装箱toy(斜率优化)
查看>>
BZOJ2659: [Beijing wc2012]算不出的算式
查看>>
c#MD5加密解密
查看>>
重温CLR(十一) 枚举类型、位标志和数组
查看>>
cerr与cout的区别
查看>>
VFL语言使用
查看>>
艾科 驱动电路分析
查看>>
洛谷3794:签到题IV——题解
查看>>
CF Round #426 (Div. 2) The Useless Toy 思维 水题
查看>>
spring 注解
查看>>
C#获取DLL、程序路径,C#获取桌面、收藏夹等特殊系统路径
查看>>
【08】node 之 fs文件
查看>>
[01] radio ,checkbox 表单文字对齐
查看>>
解决wget下载文件名乱码的一些方法
查看>>
Python--day68--Django ORM的字段参数、元信息
查看>>
Python--day30--互联网协议与osi模型
查看>>
Python--day37--多进程中的方法join()
查看>>
PyQt4软件打包成exe文件
查看>>
【Foreign】冒泡排序 [暴力]
查看>>