`

管理你的线程池(Executor)

阅读更多
    我们都知道使用线程池能够控制线程的数量,尤其是大量的“短命”线程存在时,线程池将大大降低系统消耗(内存和CPU)。不过,线程池也同样需要管理,于是我写了本篇。
首先,我们来看看管理器的整个继承关系:



    显而易见,有ThreadPoolExecutor和ScheduledThreadPoolExecutor两个实现类,当然Executor类里也有一些内部类实现了特定的功能(如class DelegatedScheduledExecutorService),我们也可以自己通过扩展这里所有的接口、抽象类、类来实现自己的特定功能,如继承ThreadPoolExecutor类,覆写beforeExecute(),让它在每个任务开始执行前执行某些操作,还有很多可扩展功能,有兴趣的朋友可以自己摸索。
    你有两种方法创建上面管理器的实例:
1、你可以用上面介绍的两个类的那这些类的实例的构造函数来创建管理器的实例,不过你要自己配置一些诸如池最大尺寸(maximumPoolSize )的参数。
2、Executors提供各种创建上面的类的实例的方法,它默认一些参数的设置。我主要介绍
这种方法中的newFixedThreadPool(int)和newCachedThreadPool()


------------newFixedThreadPool(int)------------
     创建一个默认尺寸的池,它同时运行的线程数将是固定的,如果你要让它课同时运行的最大线程数大于初始设置的那个参数,可以调用setMaximumPoolSize()来设置额外的线程来并行处理更多的任务。
     我们调用下面的方法来添加新的任务,到底Executors是如何处理的呢?

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        if(poolSize>=corePoolSize|| !addIfUnderCorePoolSize(command)) {
//如果实时连接数小于corePoolSize,那么调用addIfUnderCorePoolSize()方法
            if (runState == RUNNING && workQueue.offer(command)) {
	//如果实时连接数大于了corePoolSize,那么将任务加进等待队列中。
                if (runState != RUNNING || poolSize == 0)
	//在执行workQueue.offer(command)的过程中shutdown了,确保所有的已经提交任务能够成功执行完。
                    ensureQueuedTaskHandled(command);
            }
            else if (!addIfUnderMaximumPoolSize(command))
	
                reject(command); // is shutdown or saturated
        }
    }

下面我们来看下poolSize>=corePoolSize为不同状态时两种执行方法:
 private boolean addIfUnderCorePoolSize(Runnable firstTask) {
	//首先获取本类所有同步方法的锁
        Thread t = null;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (poolSize < corePoolSize && runState == RUNNING)
                t = addThread(firstTask);
        } finally {
            mainLock.unlock();
        }
        if (t == null)
            return false;
        t.start();
        return true;
    }
 
 private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {
	//首先获取本类所有同步方法的锁
	 Thread t = null;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (poolSize < maximumPoolSize && runState == RUNNING)
                t = addThread(firstTask);
        } finally {
            mainLock.unlock();
        }
        if (t == null)
            return false;
        t.start();
        return true;

	}
几乎完全一样,估计author Doug Lea当初也是直接copy的吧。
这两个方法都调用了
private Thread addThread(Runnable firstTask) {
        Worker w = new Worker(firstTask);
	//这里并没有区分maximumPoolSize 和corePoolSize 
	Thread t = threadFactory.newThread(w);
        if (t != null) {
            w.thread = t;
            workers.add(w);//workers并没有尺寸的限制
            int nt = ++poolSize;
	//这一步维护一个管理器使用过程中的最大尺寸,没什么好说的。
            if (nt > largestPoolSize)
                largestPoolSize = nt;
        }
        return t;
    }
于是我认为发现管理器在对待aximumPoolSize 和corePoolSize 时根本没有什么区别,可是这是不正确的,至于为什么,大家可以自己去探索!

      ThreadPoolExecutor类内部有一个:
private final HashSet<Worker> workers = new HashSet<Worker>();
其中Worker类是ThreadPoolExecutor一个内部类,实现了Runable接口。在addIfUnderMaximumPoolSize()和addIfUnderCorePoolSize()两个方法中将任务添加进这个workers[]中,这个数组维护一个正在运行的任务组,这个数组中的一个元素对应一个正在运行的线程,如果一个线程以外死亡,数组中的元素没有被移除,管理器将自动创建一个新的线程继续从头开始执行刚刚那个以外死亡的数组对应的任务。
       如此神奇?那是如何实现的?
       很简单,ThreadPoolExecutor维护的线程的run方法都是在这个loop中的,

while (task != null || (task = getTask()) != null) {
                    runTask(task);
                    task = null;
                }

如果意外死亡,task=null不执行,重新判断条件的时候再次调用runTask(task);即,死亡的是runTask(task)方法内部的run()调用而已。

      说到这里,大家应该明白了,管理器无非就是用BlockingQueue<Runnable> workQueue队列(注意这个队列是线程安全的,挺有意思)来缓冲多出来的任务,而总是有不大于maximumPoolSize(注意,这里不是corePoolSize )的线程在运行着,再有点异常死亡处理的能力而已。



--------newCachedThreadPool()--------

这个方法源码:
 public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
	}


    原来,就是让corePoolSize =0,maximumPoolSize=最大整数,然后设置空闲线程的存活时间为60s而已。看到这里,大家或许会冒出这样一个疑问:既然corePoolSize 是0,那么不是运行不了任何线程吗?呵呵,大家如果认真看了前面的文章就会有此疑问了。看同时刻运行的线程最大数是看参数maximumPoolSize不是corePoolSize 。
至于存活时间设置,那是很有必要,否则

while (task != null || (task = getTask()) != null) {
                    runTask(task);
                    task = null;
                }

getTask方法中从待执行任务缓冲队列中poll()任务的时候会有一个存活时间的超时机制,如果超时将返回null,这个线程将因为一系列连锁反应,最终死亡。

      好了,看似简单的Executor我砍了这么多,顺序整理的不是很好,大家将就看看吧。
总结一下,在设计这几个类的时候用到集合、同步(锁和阻塞队列)、枚举(TimeUnit)、多线程、安全控制(本文没有涉及)、工厂设计模式等等知识点,不简单哪^-^

  • 大小: 55.7 KB
分享到:
评论
26 楼 angel243fly 2010-12-26  
没用过这些类呢,学习了
25 楼 C_J 2010-12-18  
挺好,再分析形象一点就更好了。
补充2点吧

引用

继承ThreadPoolExecutor类,覆写beforeExecute()

1,貌似不好extends,有很多final private,用组合来做更好。

2,ThreadPoolExecutor有4种中断策略来应对非常极端的没有线程可用且无法缓存任务的情况。
24 楼 贾懂凯 2010-12-17  
sam_kee 写道
能不能给个源码呢?O(∩_∩)O~,先谢楼主

估计这位仁兄没认真看,是JDK里的固有类。
23 楼 sam_kee 2010-12-17  
能不能给个源码呢?O(∩_∩)O~,先谢楼主
22 楼 贾懂凯 2010-12-17  
yunchow 写道
effective java  里讲的很清楚

除了effective java、Java编程思想,我推荐《Java 核心技术》(机械工业出版社,基础讲的很透彻)。
不知在Java核心技术方面大家还有什么好书推荐的?
21 楼 yunchow 2010-12-17  
effective java 里讲的很清楚
20 楼 贾懂凯 2010-12-16  
78425665 写道
贾懂凯 写道
78425665 写道
我有个问题,还请指教下。

像这样
		int i = 0;
		while(i++ < 50000){
			Object obj = "...";// 假设这个obj有点大,会占点内存,假如每个占1M
			threadPool.execute(new MyThread(obj));// threadPool继承的ThreadPoolExecutor
		}


假如corePoolSize=10,maximumPoolSize = 15吧

跑了不到一分钟,发现机器内存(1G)全被吃光了。队列都满了,可程序还在跑,threadPool还在加线程,这时它加到哪里去了?

建议测试一下,内存满了看是不是还能加。如果还能加,那只能说明你的内存计算方法是错的,根本没溢出,不是一个Object 1M10个Object就是10M,这种算法是错的。


没溢出,内存不会溢出,先不管我的内存计算方法

我是说,线程队列满了,然后循环程序还在跑,这时候线程池新加的线程,到哪里去了?

不是加的线程到哪里去的问题,而是如果你限制了任务缓存队列的尺寸,到底能不能加进去的问题!建议自己去看源码。
我可以给出一个结论-会抛出异常。
19 楼 78425665 2010-12-16  
贾懂凯 写道
78425665 写道
我有个问题,还请指教下。

像这样
		int i = 0;
		while(i++ < 50000){
			Object obj = "...";// 假设这个obj有点大,会占点内存,假如每个占1M
			threadPool.execute(new MyThread(obj));// threadPool继承的ThreadPoolExecutor
		}


假如corePoolSize=10,maximumPoolSize = 15吧

跑了不到一分钟,发现机器内存(1G)全被吃光了。队列都满了,可程序还在跑,threadPool还在加线程,这时它加到哪里去了?

建议测试一下,内存满了看是不是还能加。如果还能加,那只能说明你的内存计算方法是错的,根本没溢出,不是一个Object 1M10个Object就是10M,这种算法是错的。


没溢出,内存不会溢出,先不管我的内存计算方法

我是说,线程队列满了,然后循环程序还在跑,这时候线程池新加的线程,到哪里去了?
18 楼 贾懂凯 2010-12-16  
78425665 写道
我有个问题,还请指教下。

像这样
		int i = 0;
		while(i++ < 50000){
			Object obj = "...";// 假设这个obj有点大,会占点内存,假如每个占1M
			threadPool.execute(new MyThread(obj));// threadPool继承的ThreadPoolExecutor
		}


假如corePoolSize=10,maximumPoolSize = 15吧

跑了不到一分钟,发现机器内存(1G)全被吃光了。队列都满了,可程序还在跑,threadPool还在加线程,这时它加到哪里去了?

建议测试一下,内存满了看是不是还能加。如果还能加,那只能说明你的内存计算方法是错的,根本没溢出,不是一个Object 1M10个Object就是10M,这种算法是错的。
17 楼 78425665 2010-12-16  
我有个问题,还请指教下。

像这样
		int i = 0;
		while(i++ < 50000){
			Object obj = "...";// 假设这个obj有点大,会占点内存,假如每个占1M
			threadPool.execute(new MyThread(obj));// threadPool继承的ThreadPoolExecutor
		}


假如corePoolSize=10,maximumPoolSize = 15吧

跑了不到一分钟,发现机器内存(1G)全被吃光了。队列都满了,可程序还在跑,threadPool还在加线程,这时它加到哪里去了?
16 楼 hobitton 2010-12-16  
贾懂凯 写道
hobitton 写道
囧,为何题为管理线程池呢?这不是对着JDK源码讲了下线程池的实现方式吗?没搞懂。

http://blog.csdn.net/yangdengfeng2003/archive/2009/04/01/4042250.aspx

这篇和我说的一个主题,而且比我权威。不过,我特别不喜欢看那种结论性的文字,那令我头疼!从源码看清晰简单。

其实我主要是想问为啥取这个名字 

那篇文章和你的结合不就好了,看了源码总的需要全面总结下。每次记不得了还看下源码不是件很杯具的事情?
15 楼 yeshucheng 2010-12-16  
再深入下 就是BlockingQueue的原理,写的不错
14 楼 贾懂凯 2010-12-16  
hobitton 写道
囧,为何题为管理线程池呢?这不是对着JDK源码讲了下线程池的实现方式吗?没搞懂。

http://blog.csdn.net/yangdengfeng2003/archive/2009/04/01/4042250.aspx

这篇和我说的一个主题,而且比我权威。不过,我特别不喜欢看那种结论性的文字,那令我头疼!从源码看清晰简单。
13 楼 hobitton 2010-12-16  
囧,为何题为管理线程池呢?这不是对着JDK源码讲了下线程池的实现方式吗?没搞懂。

http://blog.csdn.net/yangdengfeng2003/archive/2009/04/01/4042250.aspx
12 楼 mib168 2010-12-16  
看来还得深入看,对线程池这块还比较陌生。
11 楼 zhaoxin1943 2010-12-16  
贾懂凯 写道
zhaoxin1943 写道
关于线程池,java编程思想里讲的比较好,个人觉得。

布置zhaoxin1943有没有相关文章,定拜读~~呵呵

本人小白加菜鸟,简称小白菜。看您的文章,受益匪浅。
10 楼 贾懂凯 2010-12-16  
zhaoxin1943 写道
关于线程池,java编程思想里讲的比较好,个人觉得。

布置zhaoxin1943有没有相关文章,定拜读~~呵呵
9 楼 zhaoxin1943 2010-12-16  
关于线程池,java编程思想里讲的比较好,个人觉得。
8 楼 peak 2010-12-15  
楼主最好把源代码贴出来研究一下,jdk写的这个线程池确实不错,以前都是我们自己写,现在用这现成的确实方便了很多
7 楼 贾懂凯 2010-12-15  
javamonkey 写道
maximumPoolSize 跟queue总类关系很大,如果是容量无限的话,maximumPoolSize 设置无效

刚看了一下确实如此,如果超过corePoolsize话,新的任务会首先尝试添加进等待队列queue中,如果添加进队列失败(超时或队列设置了固定的大小下溢出),才会利用maximunPoolSize属性。

相关推荐

    java线程池ThreadPoolExecutor类使用详解.docx

    另一方面线程的细节管理交给线程池处理,优化了资源的开销。而线程池不允许使用Executors去创建,而要通过ThreadPoolExecutor方式,这一方面是由于jdk中Executor框架虽然提供了如newFixedThreadPool()、...

    针对于Executor框架,Java API,线程共享数据

    Executor框架是Java并发编程中的一个重要工具,它提供了一种管理线程池的方式,使得我们可以更方便地管理线程的生命周期和执行线程任务。 原子操作是指不可被中断的操作,要么全部执行成功,要么全部不执行。原子...

    python线程池如何使用

    线程池的基类是 concurrent.futures 模块中的 Executor,Executor 提供了两个子类,即 ThreadPoolExecutor 和ProcessPoolExecutor,其中 ThreadPoolExecutor 用于创建线程池,而 ProcessPoolExecutor 用于创建进程池...

    Java 线程池框架核心代码分析1

    前言多线程编程中,为每个任务分配一个线程是不现实的,线程创建的开销和资源消耗都是很高的。线程池应运而生,成为我们管理线程的利器。Java 通过 Executor

    线程池ThreadPoolExecutor

    线程的创建和销毁比较消耗资源,所以有一种更加高效快捷的方式管理线程—-线程池。 先来看一下线程池的java模型 Executor:线程池顶级接口,只有一个方法 ExecutorService:真正的线程池接口 void execute(Runnable...

    java线程池常用方法.docx

    在Java5之后,并发线程这块发生了根本的变化,最重要的莫过于新的启动、调度、管理线程的一大堆API了。在Java5以后,通过Executor来启动线程比用Thread的start()更好。在新特征中,可以很容易控制线程的启动、执行...

    什么是线程?Java中如何创建和管理线程?(java面试题附答案).txt

    通过将 MyRunnable 对象传递给 Thread 类...Executor 框架和线程池:用于管理和调度线程的执行。 通过合理地创建和管理线程,我们可以实现复杂的并发执行逻辑,提高程序的性能和响应能力,并确保线程之间的安全和协调。

    Java线程池框架核心代码分析

    线程池应运而生,成为我们管理线程的利器。Java 通过Executor接口,提供了一种标准的方法将任务的提交过程和执行过程解耦开来,并用Runnable表示任务。  下面,我们来分析一下 Java 线程池框架的实现...

    第7章-JUC多线程v1.1.pdf

    线程池的顶层接口是Executor, 这个接口定义了一个核心方法executor(Runnable command), 这个方法最后被ThreadPoolExecutor类实现, 这个方法用来传入任务, 并且该类是线程池的核心类, 构造方法如下 : public ...

    althena:一个简单的下载管理器使用AsyncTask Executor

    下载管理器组件 特征: 支持在AsyncTask线程池中下载文件

    JUC多线程学习个人笔记

    线程池:JUC提供了Executor框架,可以方便地创建和管理线程池,实现任务的异步执行和线程的复用。 并发集合:JUC提供了一些线程安全的集合类,如ConcurrentHashMap、ConcurrentLinkedQueue等,可以在多线程环境下...

    MutliThreading-and-Concurrency

    Java线程池和ThreadPoolExecutor Java线程池管理工作线程池,它包含使任务等待执行的队列。Java线程池管理Runnable线程的集合,工作线程从队列中执行Runnable。 Java 5,Java并发API提供了一种机制Executor框架。 ...

    Spark 3.0.0集群启动原理和源码详解

    本课程讲解Spark 3.0.0 集群启动原理和源码详解的内容,包括:... Executor 中任务的执行: Executor 中任务的加载、Executor 中的任务线程池、 任务执行失败处理、 揭秘TaskRunner; Executor 执行结果的处理方式。 

    java8集合源码分析-javaInterview:java面试

    线程池下executor 的futre方法? (听不太清 executorService 用过么? 线程池具体 怎么使用 ? java怎么加载一个类? 反射怎么加载类? wait notify为什么放在 Object 下? java几个类加载器? 类加载方式? 动态代理和静态...

    Java面试问题带答案40道.docx

    另外,还可以使用Executor框架或线程池来管理线程。 3. Java中什么是抽象类? 答:抽象类是一种不能被实例化的类,只能作为其他类的基类。它可以包含抽象方法,用于定义子类必须实现的方法。 4. Java中什么是接口...

    Java 7并发编程实战手册

    java7在并发编程方面,带来了很多令人激动的新功能,这将使你的应用程序具备更好的并行任务性能。 《Java 7并发编程实战手册》是Java 7并发编程的实战指南,介绍了Java 7并发API中大部分重要而有用的机制。全书分为9...

    Android-Thread-Pool:一个有用的线程执行器,它管理池中所有可运行的线程

    Android线程池 一个简单的Android线程池,该池基于Handler和Runable #联络我 电子邮件: 博客: : 如何使用? 创建实例 CustomExecutor executor = new CustomThreadPoolExecutor . Builder () ....

    concurrent_rails::joystick:小型图书馆,可让并发Ruby和Rails一起玩

    Rails有一种复杂的线程管理方式,称为Executor,并发-ruby(最具体地说是 )无法与之无缝配合。 这个gem的目标是提供一个简单的库,使开发人员无需担心Rails的Executor和随之而来的所有问题就可以使用Futures:...

    Java SE实践教程 源代码 下载

    6.3.1 线程池和Executor 124 6.3.2 Callable和Future 126 6.3.3 ScheduledExecutorService 127 6.4 线程安全的集合和同步器 128 6.4.1 阻塞队列 128 6.4.2 指定阻塞时间 130 6.4.3 同步器 131 6.4.4 Atomic...

    Java SE实践教程 pdf格式电子书 下载(一) 更新

    6.3.1 线程池和Executor 124 6.3.2 Callable和Future 126 6.3.3 ScheduledExecutorService 127 6.4 线程安全的集合和同步器 128 6.4.1 阻塞队列 128 6.4.2 指定阻塞时间 130 6.4.3 同步器 131 6.4.4 Atomic...

Global site tag (gtag.js) - Google Analytics