国内最全IT社区平台 联系我们 | 收藏本站
华晨云阿里云优惠2
您当前位置:首页 > php开源 > php教程 > 线程池(一)

线程池(一)

来源:程序员人生   发布时间:2016-07-14 14:42:23 阅读次数:5617次

甚么是线程池?为何要使用线程池?

进行数据库开发的时候,我们应当都接触过数据库连接池,为了不每次进行数据库连接的时候都重新新建和烧毁数据库连接,我们可使用1个数据库连接池来保护1些数据库连接,让他们长时间保持1个激活的状态,当系统需要使用使用数据库的时候,就从连接池中拿来1个可用的连接便可,而不是创建新的连接。反之,当我们需要关闭连接的时候,也不是真的关闭连接,而是将这个连接返还给连接池。通过这样的方式,可以节俭很多的创建和烧毁对象的时间。
其实线程池也是类似的概念,线程池中总有那末几个活跃的线程,当你需要的时候可以从线程池里随意拿来1个空闲线程,当完成工作时其实不着急关闭线程,而是返回给线程池,方便其他人使用。
这里我们肯定还有疑问为何我们传统的创建自定义线程有甚么问题?问题就是虽然线程是1种轻量级的工具,但它的创建和关闭都需要花费时间,如我们在程序中随便的创建线程而不加控制其数量,反而会耗尽cpu和内存资源。即使没有outofmemory异常,大量的回收线程也会致使GC停顿的时间延长。所以我们实际中可以优先斟酌使用线程池对线程进行控制和管理,更加有效的公道的使用线程进行提高程序的性能。

jdk对线程池的支持

Jdk提供了1套Executor框架,核心成员以下图:

其中ThreadPoolExecutor是1个线程池,Executors扮演着1个线程工厂的角色,通过Executors类可以获得1个具有特定功能的线程池。通过UML图我们可以看到ThreadPoolExecutor实现Executor接口通过这个接口,任何Runable对象都可以被ThreadPoolExecutor线程池调度。
Executors主要提供以下工厂方法:
static ExecutorService newCachedThreadPool() static ExecutorService newFixedThreadPool(int nThreads) static ExecutorService newSingleThreadExecutor() static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) static ScheduledExecutorService newSingleThreadScheduledExecutor()
newCachedThreadPool()方法:该方法返回1个会根据实际情况进行调剂的线程池,线程数量不肯定。但如果有空闲的线程可复用,则优先选择可复用的线程。若当先线程都在工作,同时又有新的任务提交,则会创建新的线程来处理。所有线程在当前任务完成的时候,将返回线程池进行复用。
newFixedThreadPool(int nThreads)方法:该方法返回1个具有固定数量的线程的线程池。该线程池中的线程数量始终不变。当1个任务提交时,若有空闲线程则履行,若没有,会被交给1个任务队列,当有空闲线程的时候,就会处理任务队里的任务。
newScheduleThreadPool(int corePoolSize)方法:该方法会返回1个ScheduledExecutorService对象。ScheduledExecutorService接口在ExecutorService接口的基础上扩大了在给定时间履行某任务的功能,如某个固定时间后开始履行,或周期性的履行某个任务。
newSingleThreadScheduleExecutor();也返回1个ScheduledExecutorService对象,不过线程池只有1个线程。
下面简单演示下newFixedThreadPool(int nThreads)的简单使用:
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** * 简单展现newFixedThreadPool * @author zdm * */ public class ThreadPoolDemo { public static class MyTask implements Runnable { @Override public void run() { System.out.println(System.currentTimeMillis() + ":ThreadId:" + Thread.currentThread().getId()); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) { ExecutorService es = Executors.newFixedThreadPool(5); for(int i = 0; i < 10; i++) { es.execute(new MyTask()); } } }
运行结果:

有运行结果可以知道具有5个线程的线程池的把10个任务分两批完成,前5个和后5个任务恰好相差了1秒,因而可知上面程序符合newFixedThreadPool产的线程池的行动。

有时间计划的任务:

newScheduleThreadPool(int corePoolSize)返回1个ScheduleExecutorService对象,可以根据时间需要对线程进行调度,它的主要方法以下:
ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) //创建并履行在给定延迟后启用的1次性操作。 ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) //创建并履行1个在给定初始延迟后首次启用的定期操作,后续操作具有给定的周期;也就是将在 initialDelay 后开始履行,然后在 initialDelay+period 后履行,接着在 initialDelay + 2 * period 后履行,依此类推。 ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) //创建并履行1个在给定初始延迟后首次启用的定期操作,随后,在每次履行终止和下1次履行开始之间都存在给定的延迟。
与其他线程池不同,ScheduledExecutorService其实不1定会立即安排履行任务,它会在指定的时间,对任务进行调度,起到的计划任务的作用。
这里需要注意的就是scheduleAtFixedRate和scheduleWithFixedDelay这两个方法都是对任务进行周期性的调度,但是又有1点不同。
对FixedRate的方式来讲,任务调度的频率是1定的,它是以上1个任务开始履行的时间为出发点,以后的period时间,调度下1次任务。而FixedDealy则是上1个任务结束后,在经过delay对下1次任务进行调度。
如果还有疑问,我们可以官方的文档对它两的解释:
scheduleAtFixRate(Runnable command, long initialDealy, long period, TimeUnit unit):
Creates and executes a periodic action that becomes enabled first after the given initial delay, and subsequently with the given period; that is executions will commence after initialDelay  then initialDelay+period, then initialDelay + 2 * period, and so on.
翻译:创建1个周期性的任务,任务开始于指定的初始延迟,后续的任务依照给定的周期履行:后续第1个任务将会在initialDelay+period,下1个任务将在initialDelay+2period履行。
scheduleWithFixedDelay(Runnable command, long initialDealy, long delay, TimeUnit unit):
Creates and executes a periodic action that becomes enabled first after the given initial delay, and subsequently with the given delay between the termination of one execution and the commencement of the next.  If any execution of the task encounters an exception, subsequent executions are suppressed. Otherwise, the task will only terminate via cancellation or termination of the executor.
翻译:创建1个周期性的任务,任务开始于指定的初始延迟,后续的任务依照给定的延迟履行,即上1个任务结束的时间到下1个任务开始时间的时间差。
下面可以简单演示下ScheduledExecutorService的scheduleAtFixedRate的方法:
import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; /** * ScheduledExecutorService的简单示例 * @author zdm * */ public class ScheduleExecutorServiceDemo { public static void main(String[] args) { ScheduledExecutorService ses = Executors.newScheduledThreadPool(10); ses.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { TimeUnit.SECONDS.sleep(1); System.out.println(System.currentTimeMillis()/1000); } catch (InterruptedException e) { e.printStackTrace(); } } }, 0, 2, TimeUnit.SECONDS); } }
运行结果:

这个任务履行使用1秒,周期为2秒,也就是2秒履行1次任务,恰好运行结果符合我们的预期目标。
但是这里有1个问题就是如果任务的履行时间大于调度周期的时间会产生怎样办?这里我们将TimeUnit.SECONDS.sleep(5)尝试1下

会发现任务的周期调度变成了5秒~~~
如果采取scheduledWithFixedDelay()调用会依照修改任务履行需要5秒,延迟为2秒,那末任务的实际间隔为7秒。
这里还需要注意1个问题:那就是调度程序其实不会无穷期的延续等待。如果任务本身产生了异常,那末后续的子任务都会停止调用。所以需要对异常进行及时的处理,以保证周期性任务的稳定性。

线程池的内部实现

对核心的那几个线程池,虽然看上去功能各不相同其实内部都是使用了ThreadPoolExecutor实现:

public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
从上面我们可以发现它们都是ThreadPoolExecutor的封装,为何功能如此强大?我们可以看1下ThreadPoolExecutor的构造方法:

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)

方法的参数含义以下:
corePoolSize:表示线程中的线程数量。
maximuumPoolSize:表示线程池中最大的线程数量。
keepAliveTime:当线程数量超过corePoolSize时,过剩的空闲线程的存活时间
unit:表示存活时间单位
workQueue:任务队列,被提交但却没有没履行的任务。
threadFactory:线程工厂,用于创建线程,1般用默许的便可。
handler:谢绝策略,当任务太多来不及处理时,如何谢绝。
其中的参数workQueue是1个BlockingQueue<Runnable>接口,它有几种不同功能的子类队列:

先认识1下Blocking:

阻塞队列,顾名思义,首先它是1个队列,而1个队列在数据结构中所起的作用大致以下图所示:


从上图我们可以很清楚看到,通过1个同享的队列,可使得数据由队列的1端输入,从另外1端输出;
经常使用的队列主要有以下两种:(固然通过不同的实现方式,还可以延伸出很多不同类型的队列,DelayQueue就是其中的1种)
  先进先出(FIFO):先插入的队列的元素也最早出队列,类似于排队的功能。从某种程度上来讲这类队列也体现了1种公平性。
  落后先出(LIFO):后插入队列的元素最早出队列,这类队列优先处理最近产生的事件。

      多线程环境中,通过队列可以很容易实现数据同享,比如经典的“生产者”和“消费者”模型中,通过队列可以很便利地实现二者之间的数据同享。假定我们有若干生产者线程,另外又有若干个消费者线程。如果生产者线程需要把准备好的数据同享给消费者线程,利用队列的方式来传递数据,就能够很方便地解决他们之间的数据同享问题。但如果生产者和消费者在某个时间段内,万1产生数据处理速度不匹配的情况呢?理想情况下,如果生产者产出数据的速度大于消费者消费的速度,并且当生产出来的数据积累到1定程度的时候,那末生产者必须暂停等待1下(阻塞生产者线程),以便等待消费者线程把积累的数据处理终了,反之亦然。但是,在concurrent包发布之前,在多线程环境下,我们每一个程序员都必须去自己控制这些细节,特别还要统筹效力和线程安全,而这会给我们的程序带来不小的复杂度。好在此时,强大的concurrent包横空出世了,而他也给我们带来了强大的BlockingQueue。(在多线程领域:所谓阻塞,在某些情况下会挂起线程(即阻塞),1旦条件满足,被挂起的线程又会自动被唤醒)

下面两幅图演示了BlockingQueue的两个常见阻塞场景:


如上图所示:当队列中没有数据的情况下,消费者真个所有线程都会被自动阻塞(挂起),直到有数据放入队列。


如上图所示:当队列中填满数据的情况下,生产者真个所有线程都会被自动阻塞(挂起),直到队列中有空的位置,线程被自动唤醒。

 这也是我们在多线程环境下,为何需要BlockingQueue的缘由。作为BlockingQueue的使用者,我们不再需要关心甚么时候需要阻塞线程,甚么时候需要唤醒线程,由于这1切BlockingQueue都给你1手包办了。既然BlockingQueue如此神通广大,让我们1起来见识下它的经常使用方法:

BlockingQueue的核心方法

放入数据:
offer(anObject):表示如果可能的话,将anObject加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则返回false.(本方法不阻塞当前履行方法的线程)
offer(E o, long timeout, TimeUnit unit),可以设定等待的时间,如果在指定的时间内,还不能往队列中加入BlockingQueue,则返回失败。
put(anObject):把anObject加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻塞直到BlockingQueue里面有空间再继续.
获得数据:
  poll(time):取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,取不到时返回null;
  poll(long timeout, TimeUnit unit):从BlockingQueue取出1个队首的对象,如果在指定时间内,队列1旦有数据可取,则立即返回队列中的数据。否则知道时间超时还没有数据可取,返回失败。
  take():取走BlockingQueue里排在首位的对象,BlockingQueue为空,阻断进入等待状态直到BlockingQueue有新的数据被加入; 
  drainTo():1次性从BlockingQueue获得所有可用的数据对象(还可以指定获得数据的个数),通过该方法,可以提升获得数据效力;不需要屡次分批加锁或释放锁。

BlockingQueue成员详细介绍

1. ArrayBlockingQueue
      基于数组的阻塞队列实现,在ArrayBlockingQueue内部,保护了1个定长数组,以便缓存队列中的数据对象,这是1个经常使用的阻塞队列,除1个定长数组外,ArrayBlockingQueue内部还保存着两个整形变量,分别标识着队列的头部和尾部在数组中的位置。
  ArrayBlockingQueue在生产者放入数据和消费者获得数据,都是共用同1个锁对象,由此也意味着二者没法真正并行运行,这点特别不同于LinkedBlockingQueue;依照实现原理来分析,ArrayBlockingQueue完全可以采取分离锁,从而实现生产者和消费者操作的完全并行运行。Doug Lea之所以没这样去做,或许是由于ArrayBlockingQueue的数据写入和获得操作已足够轻巧,以致于引入独立的锁机制,除给代码带来额外的复杂性外,其在性能上完全占不到任何便宜。 ArrayBlockingQueue和LinkedBlockingQueue间还有1个明显的不同的地方在于,前者在插入或删除元素时不会产生或烧毁任何额外的对象实例,而后者则会生成1个额外的Node对象。这在长时间内需要高效并发地处理大批量数据的系统中,其对GC的影响还是存在1定的区分。而在创建ArrayBlockingQueue时,我们还可以控制对象的内部锁是不是采取公平锁,默许采取非公平锁。
2. LinkedBlockingQueue
      基于链表的阻塞队列,同ArrayListBlockingQueue类似,其内部也保持着1个数据缓冲队列(该队列由1个链表构成),当生产者往队列中放入1个数据时,队列会从生产者手中获得数据,并缓存在队列内部,而生产者立即返回;只有当队列缓冲区到达最大值缓存容量时(LinkedBlockingQueue可以通过构造函数指定该值),才会阻塞生产者队列,直到消费者从队列中消费掉1份数据,生产者线程会被唤醒,反之对消费者这真个处理也基于一样的原理。而LinkedBlockingQueue之所以能够高效的处理并发数据,还由于其对生产者端和消费者端分别采取了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高全部队列的并发性能。
作为开发者,我们需要注意的是,如果构造1个LinkedBlockingQueue对象,而没有指定其容量大小,LinkedBlockingQueue会默许1个类似无穷大小的容量(Integer.MAX_VALUE),这样的话,如果生产者的速度1旦大于消费者的速度,或许还没有等到队列满阻塞产生,系统内存就有可能已被消耗殆尽了。
ArrayBlockingQueue和LinkedBlockingQueue是两个最普通也是最经常使用的阻塞队列,1般情况下,在处理多线程间的生产者消费者问题,使用这两个类足以。
3. DelayQueue
      DelayQueue中的元素只有当其指定的延迟时间到了,才能够从队列中获得到该元素。DelayQueue是1个没有大小限制的队列,因此往队列中插入数据的操作(生产者)永久不会被阻塞,而只有获得数据的操作(消费者)才会被阻塞。
使用处景:
  DelayQueue使用处景较少,但都相当奇妙,常见的例子比如使用1个DelayQueue来管理1个超时未响应的连接队列。
4. PriorityBlockingQueue
      基于优先级的阻塞队列(优先级的判断通过构造函数传入的Compator对象来决定),但需要注意的是PriorityBlockingQueue其实不会阻塞数据生产者,而只会在没有可消费的数据时,阻塞数据的消费者。因此使用的时候要特别注意,生产者生产数据的速度绝对不能快于消费者消费数据的速度,否则时间1长,会终究耗尽所有的可用堆内存空间。在实现PriorityBlockingQueue时,内部控制线程同步的锁采取的是公平锁。
5. SynchronousQueue
      1种无缓冲的等待队列,类似于无中介的直接交易,有点像原始社会中的生产者和消费者,生产者拿着产品去集市销售给产品的终究消费者,而消费者必须亲身去集市找到所要商品的直接生产者,如果1方没有找到适合的目标,那末对不起,大家都在集市等待。相对有缓冲的BlockingQueue来讲,少了1个中间经销商的环节(缓冲区),如果有经销商,生产者直接把产品批发给经销商,而无需在乎经销商终究会将这些产品卖给那些消费者,由于经销商可以库存1部份商品,因此相对直接交易模式,整体来讲采取中间经销商的模式会吞吐量高1些(可以批量买卖);但另外一方面,又由于经销商的引入,使得产品从生产者到消费者中间增加了额外的交易环节,单个产品的及时响应性能可能会下降。
  声明1个SynchronousQueue有两种不同的方式,它们之间有着不太1样的行动。公平模式和非公平模式的区分:
  如果采取公平模式:SynchronousQueue会采取公平锁,并配合1个FIFO队列来阻塞过剩的生产者和消费者,从而体系整体的公平策略;
  但如果是非公平模式(SynchronousQueue默许):SynchronousQueue采取非公平锁,同时配合1个LIFO队列来管理过剩的生产者和消费者,而后1种模式,如果生产者和消费者的处理速度有差距,则很容易出现饥渴的情况,便可能有某些生产者或是消费者的数据永久都得不到处理。所以在使用SynchronousQueue时会设置很大的maximumPoolSize,而否则会很容易履行谢绝策略。
  • 小结
      BlockingQueue不光实现了1个完全队列所具有的基本功能,同时在多线程环境下,他还自动管理了多线间的自动等待于唤醒功能,从而使得程序员可以疏忽这些细节,关注更高级的功能。 
由此可以知道,在使用newCachedThreadPool时,当提交的任务过量时,没有空闲的线程,使用SynchronousQueue,它是直接提交任务的队列,从而迫使线程池创建新的线程来处理任务。当任务履行完成,它会被指定的时间内被回收。由于maximumPoolSize=0。所以当有大量任务提交,而任务的处理又不是很快的情况下,会致使系统资源的耗尽。
使用newFixedThreadPool和newSingleThreadExecutor时应当注意无界的LinkedBlockingQueue的增长。
下面给出线程池的核心调度任务的代码:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
代码第5行的workerCountOf()方法获得当前线程池中的线程总数,当线程总数小于corePoolSize时,会将任务通过方法addWorker()直接调度。否则workQueue.offer()进入任务队列,如果进入任务队列失败(有界队列到达上限或使用SynchronousQueue),则会履行17行,将任务提交到线程池,当前线程数到达maximumPoolSize,则提交失败,使用谢绝策略,未到达,则分配线程履行。






生活不易,码农辛苦
如果您觉得本网站对您的学习有所帮助,可以手机扫描二维码进行捐赠
程序员人生
------分隔线----------------------------
分享到:
------分隔线----------------------------
关闭
程序员人生