请稍等 ...
×

采纳答案成功!

向帮助你的同学说点啥吧!感谢那些助人为乐的人

线程池的拒绝策略

想通过使用信号量来控制提交的任务,设置了最大size为10,信号量也设为了10,但是任务还是会被拒绝
以下是代码
public class BoundedExecutor {
    private final ExecutorService executorService;
    private final Semaphore semaphore;

    public BoundedExecutor(ExecutorService executorService, Semaphore semaphore) {
        this.executorService = executorService;
        this.semaphore = semaphore;
    }
    public void execute(Runnable runnable) throws InterruptedException {
        semaphore.acquire();
        executorService.execute(() -> {
            try {
                runnable.run();
            }finally {
                semaphore.release();
            }
        });
    }
}

ExecutorService executorService1 = new ThreadPoolExecutor(1,10,60L, TimeUnit.SECONDS, new SynchronousQueue<>());
BoundedExecutor boundedExecutor = new BoundedExecutor(executorService1,new Semaphore(10));
for (int i = 0;i <10000;i++){
    boundedExecutor.execute(runnable);
}
executorS


正在回答 回答被采纳积分+3

2回答

Jimin 2018-06-04 23:40:53

首先你这里的信号量基本没什么用,原因在于你这里已经设置了线程池最多10个线程执行,代表线程池最大并行数根本不会超过10

其次,你目前代码不全,我大致猜测是你期望使用指定的线程,然后不丢弃任何线程,保证都执行。如果我没误解你想达到的效果,你可以参考课程里Executors.newCachedThreadPool() 代码的实现,那里给出了合理的ThreadPool的参数,然后课程里演示的例子都是使用Executors.newCachedThreadPool() 结合Semaphore达到这个效果的。

这里,你对为什么会执行拒绝有困惑,根本是因为对线程调度的细节不是特别清楚。这里我把源码顺便说一下:

https://img1.sycdn.imooc.com//szimg/5b155b2a0001ec6b18521398.jpg

我本地是jdk8版本的代码,你可以对着看一下。首先能执行到拒绝策略,就是走到红框里面的代码,这里根本是需要新增线程到队列时失败了,我们具体看一下addWorker的实现:

https://img1.sycdn.imooc.com//szimg/5b155bb0000168bc19121442.jpg

你的代码出现错误根本是走到红框的判断那里,这里根本是拿maximumPoolSize 来进行判断的,只要大于了maximumPoolSize就会返回false了,然后走到执行策略,这是线程池自己的逻辑,因此需要给一个合理的maximumPoolSize值。

那么为什么Executors.newCachedThreadPool()就可以达到预期的效果呢,我们来看一下他的实现:

https://img1.sycdn.imooc.com//szimg/5b155ca90001349713640200.jpg

这里为了达到预期的效果, maximumPoolSize 给的是 Integer.MAX_VALUE。课程里在这基础上借助Semaphore来限制并发数。

最后,你目前这个封装有个严重的问题,就是每次声明一个实例基本上都要声明一个新的线程池和信号量的实例,却没有提供一个关闭线程池的方法。如果这个线程池需要一直用不关闭的话,那么你这个封装就没什么意义了,你可以自己体会一下~

0 回复 有任何疑惑可以回复我~
Jimin 2018-06-04 22:24:31

你好,你这个问题我有点疑问:

1)semaphore.acquire(); 的位置为什么不是在线程启动时获取信号,是手误?

2)你这里说的任务被拒绝,提供一下日志来分析

0 回复 有任何疑惑可以回复我~
  • 提问者 慕仰5076582 #1
    因为如果直接提交超过了线程池大小,不是就直接会被拒绝了吗,我就想在任务提交之前通过信号量控制提交任务的速度..不知道是不是理解有错误。
    
    日志
    Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.mall.concurrency.ThreadPool.BoundedExecutor$$Lambda$2/515132998@36aa7bc2 rejected from java.util.concurrent.ThreadPoolExecutor@76ccd017[Running, pool size = 10, active threads = 8, queued tasks = 0, completed tasks = 2]
    	at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
    	at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
    	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
    at com.mall.concurrency.ThreadPool.BoundedExecutor.execute(BoundedExecutor.java:24)
    	at com.mall.concurrency.ThreadPool.CachedThreadPoolExample.main(CachedThreadPoolExample.java:20)
    回复 有任何疑惑可以回复我~ 2018-06-04 22:27:44
  • Jimin 回复 提问者 慕仰5076582 #2
    你这理解的有点问题啊,线程还不确定什么时候执行你就提占着信号了,根本无法起到控制指定线程的作用啊
    回复 有任何疑惑可以回复我~ 2018-06-04 22:30:40
  • 提问者 慕仰5076582 回复 Jimin #3
    但是这样正常应该是会在acquire的时候阻塞的吧..而不是抛出RejectedExecutionException,因为只有在我一个任务执行完,然后release掉,这样才能再提交一个任务 去执行runable 的run方法
    回复 有任何疑惑可以回复我~ 2018-06-04 22:35:01
问题已解决,确定采纳
还有疑问,暂不采纳
意见反馈 帮助中心 APP下载
官方微信