博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
线程池
阅读量:5241 次
发布时间:2019-06-14

本文共 10585 字,大约阅读时间需要 35 分钟。

8 线程池

8.1 如何解决任务和执行策略之间的耦合性问题?

8.1.1任务与策略存在哪几种耦合性?

a:依赖性任务

如果提交给线程池的任务需要依赖其他的任务,那么就隐含地给执行策略带来了约束,此时必须小心地维持这些执行策略以避免产生活跃性问题。

b:使用线程封闭机制的任务

任务要求其执行所,在的Executo:是单线程的。如果将Executor从单线程环境改为线程池环境,那么将会失去线程安全性。(这个要求井不需要这么严格,只要确保任务不会井发执行,井提供足够的同步机制,使得一个任务对内存的作用对于下一个任务一定是可见的—这正是newSingleThreadExecutor提供的保证。)

c:对响应时间敏感的任务

如果将一个运行时间较长的任务提交到单线程的Executor中,或者将多个运行时间较长的任务提交到一个只包含少量线程的线程池中,那么将降低由该Executor管理的服务的响应性。

d:使用丁hreadLocal的任务

ThreadLocal使每个线程都可以拥有某个变量的一个私有"版本"。只有当线程本地值的生命周期受限于任务的生命周期时,在线程池的线程中使用ThreadLocal才有意义,而在线程池的线程中不应该使用ThreadLocal在任务之间传递值。(因为线程池中的线程存在异常和回收的情况)

   

e :其他隐式约束

如果应用程序使用一个包含10个连接的JDBC连接池,并且每个任务需要一个数据库连接,那么线程池就好像只有10个线程,因为当超过10个任务时,新的任务需要等待其他任务释放连接。

   

8.1.2 防止线程饥饿死锁

线程池中的任务需要无限期地等待一些必须由池中其他任务才能提供的资源或条件,除

非线程池足够大,否则将发生线程饥饿死锁。(因此要在文档中记录线程池的大小和限制)

   

header=exec.submit(new LoadFileTask("header.html"));

footer=exec.submit(new LoadFileTask("footer.html"));

String page=renderBody();

//将发生死锁由于任务在等待子任务的结果

return header.get()+page+footer.get();

8.1.3  防止运行时间较长的任务

限定任务等待资源的时间,而不要无限制地等待。在平台类库的大多数可阻塞方法中,都同时定义了限时版本和无限时版本. 如Thread.join, BlockingQueue.put, ountDownLatch.await以及Selector.select等。如果在线程池中总是充满了被阻塞的任务,那么也可能表明线程池的规模过小。

   

8.2 如何设置线程池的大小?

根据Runtime.availableProcessors来动态计算(CPU处理器的个数)。必须分析计算环境、资源预算和任务的特性。有多少个CPU?多大的内存?任务是计算密集型、I/O密集型还是二者皆可? 如果需要执行不同类别的任务,并且它们之间的行为相差很大,那么应该考虑使用多个线程池,从而使每个线程池可以根据各自的工作负载来调整。

8.2.1

对于计算密集型的任务,在拥有Ncpu个处理器的系统上,当线程池的大小为Ncpu+ 1时,通常能实现最优的利用率。

8.2.2

对于包含I/O操作或者其他阻塞操作的任务,由于线程并不会一直执行,因此线程池的规模应该更大。你必须估算出任务的等待时间与计算时间的比值。可以通过一些分析或监控工具来获得。(计算每个任务对该资源的需求量,然后用该资源的可用总量除以每个任务的需求量,所得结果就是线程池大小的上限。)

池大小=cpu个数 *  cpu利用率 * (1 + 等待时间/计算时间)

 

   

   

8.3 配置线程池的核心类ThreadPoolExecutor

8.3.1 ThreadPoolExecutor家族图谱

8.3.2 ThreadPoolExecutor常用的构造方法

public ThreadPoolExecutor(

int corePoolSize,

int maximumPoolSize,

long keepAliveTime,

TimeUnit unit,

BlockingQueue<Runnable> workQueue,

ThreadFactory threadFactory,

RejectedExecutionHandler handler);

 

int corePoolSize

核心池的大小,这个参数跟后面讲述的线程池的实现原理有非常大的关系。在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建corePoolSize个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;

int maximumPoolSize

线程池最大线程数,这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多少个线程

long keepAliveTime,

表示活动时间线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;

TimeUnit unit

参数keepAliveTime的时间单位,有7种取值,在TimeUnit类中有7种静态属性:

TimeUnit.DAYS;          //天

TimeUnit.HOURS;          //小时

TimeUnit.MINUTES;         //分钟

TimeUnit.SECONDS;        //秒

TimeUnit.MILLISECONDS;      //毫秒

TimeUnit.MICROSECONDS;      //微妙

TimeUnit.NANOSECONDS;       //纳秒

BlockingQueue<Runnable> workQueue

一个阻塞队列,用来存储等待执行的任务,这个参数的选择也很重要,会对线程池的运行过程产生重大影响,一般来说,这里的阻塞队列有以下几种选择:

ArrayBlockingQueue;

LinkedBlockingQueue;

SynchronousQueue;

ThreadFactory threadFactory

线程工厂,主要用来创建线程

RejectedExecutionHandler handler

表示当拒绝处理任务时的策略,有以下3种取值:

1:ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异

2:ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。

3:ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)

 

8.3.3 线程的创建与销毁

8.3.4 队列

无界队列:无界的LinkedBlockingQueue(FIFO);

有界队列:有界的LinkedBlockingQueue(FIFO)、ArrayBlockingQueue(FIFO)、PriorityBlockingQueue(优先级队列)

 

同步移交:

最后一个BlockingQueue实现是SynchronousQueue。SynchronousQueue,避免任务排队,不是一个真正的队列,而是一种在线程之间进行移交的机制。要将一个元素放入SynchronousQueue中,必须有另一个线程正在等待接受这个元素。只有当线程池是无界的或者可以拒绝任务时,SynchronousQueue才有意义。因为Synchronous}ueue没有存储功能,因此put和take会一直阻塞。

DelayQueue,它实现了BlockingQueue, 为ScheduledThreadPoolExecutor提供调度功能。DelayQueue管理着一组Delayed对象。每个Delayed对象都有一个相应的延迟时间:在DelayQueue中,只有某个元素逾期后,才能从DelayQueue中执行take操作。从DelayQueue中返回的对象将根据它们的延迟时间进行排序。

 

8.3.5 ThreadPoolExecutor的常用方法

execute()方法

实际上是Executor中声明的方法,在ThreadPoolExecutor进行了具体的实现,这个方法是ThreadPoolExecutor的核心方法,通过这个方法可以向线程池提交一个任务,交由线程池去执行。

submit()方法

是在ExecutorService中声明的方法,在AbstractExecutorService就已经有了具体的实现,在ThreadPoolExecutor中并没有对其进行重写,这个方法也是用来向线程池提交任务的,但是它和execute()方法不同,它能够返回任务执行的结果,去看submit()方法的实现,会发现它实际上还是调用的execute()方法,只不过它利用了Future来获取任务执行结果(Future相关内容将在下一篇讲述)。

shutdown()和shutdownNow()

是用来关闭线程池的。

还有很多其他的方法:

比如:getQueue() 、getPoolSize() 、getActiveCount()、getCompletedTaskCount()等获取与线程池相关属性的方法。

 

8.3.6 ThreadPoolExecutor的几个扩展方法

它提供了几个可以在子类化中改写的方法beforeExecute、afterExeeute和terminated。这些方法中还可以添加日志、计时、监视或统计信息收集的功能。无论任务是从run中正常返回,还是抛出一个异常而返回,afterExecute都会被调用。(如果任务在完成后带有一个Error,那么就不会调用afterExecute o)如果beforeExecute抛出一个RuntimeException,那么任务将不被执行,并且afterExecute也不会被调用。

在线程池完成关闭操作时调用terminated,可以用来释放Executor在其生命周期里分配的各种资源。还可以执行发送通知、记录日志或者收集finalize统计信息等操作。

程序清单8-9增加了日志和计时等功能的线程池:

public class TimingThreadPOOl extends ThreadPoolExecutor{

private final ThreadLocal<Long> startTime

= new ThreadLocal<Long>();

private final Logger log=Logger.getLogger("TimingThreadPool");

private final AtomicLong numTasks=new AtomicLong();

private final AtomicLong totalTime=new AtomicLong();

protected void beforeExecute(Thread t,Runnable r){

super .beforeExecute(t,r);

log.fine (String. format("Thread %s: start %s" , t ,r ));

startTime .set(System.nanoTime());

}

protected void afterExecute(Runnable r, Throwable t){

try{

long endTime=System.nanoTime();

long taskTime=endTime - startTime .get();

numTasks.incrementAndGet();

totalTime.addAndGet(taskTime);

log.fine (String.format("Thread %s : end %s,time=%dns",

t,r, taskTime));

finally{

super.afterExecute(r, t);

}

}

protected void terminated(){

try{

log .info(String.format("Terminated: avg time=%dns",

totalTime.get()/numTasks.get()));

}finally{

super .terminated();

}

}

}

8.3.7 构造后定制ThreadPoolExecutor

1:调用Setter修改

在调用完ThreadPoolExecutor的构造函数后,仍然可以通过设置函数(Setter)来修改大多数传递给它的构造函数的参数(例如线程池的基本大小、最大大小、存活时间、线程工厂以及拒绝执行处理器(Rejected Execution Handler))。

2:用unconfigurableExecutorService防止修改

在Executors中包含一个unconfigurableExecutorService工厂方法,该方法对一个现有的ExecutorService进行包装,使其只暴露出ExecutorService的方法,因此不能对它进行配置。newSingleThreadExecutor返回按这种方式封装的ExecutorService,而不是最初的ThreadPoolExecutor。虽然单线程的Executor实际上被实现为一个只包含唯一线程的线程池,但它同样确保了不会并发地执行任务。如果在代码中增加单线程Executor的线程池大小,那么将破坏它的执行语义。你可以在自己的Executor中使用这项技术以防止执行策略被修改。如果将ExecutorService暴露给不信任的代码,又不希望对其进行修改,就可以通过unconfigurableExecutorService来包装它。

8.4 Executors类

 

在java doc中,并不提倡我们直接使用ThreadPoolExecutor,而是使用Executors类中提供的几个静态方法来创建线程池。

8.4.1 newCachedThreadPool()

创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。线程池为无限大,当执行第二个任务时第一个任务已经完成,会复用执行第一个任务的线程,而不用每次新建线程。

底层:返回ThreadPoolExecutor实例,corePoolSize为0;maximumPoolSize为Integer.MAX_VALUE;keepAliveTime为60L;unit为TimeUnit.SECONDS;workQueue为SynchronousQueue(同步队列)

通俗:当有新任务到来,则插入到SynchronousQueue中,由于SynchronousQueue是同步队列,因此会在池中寻找可用线程来执行,若有可以线程则执行,若没有可用线程则创建一个线程来执行该任务;若池中线程空闲时间超过指定大小,则该线程会被销毁。

适用:执行很多短期异步的小程序或者负载较轻的服务器

public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {

        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,

        60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), threadFactory);}

 

8.4.2 newFixedThreadPool() 

  创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。定长线程池的大小最好根据系统资源进行设置,如untime.getRuntime().availableProcessors()。 

底层:返回ThreadPoolExecutor实例,接收参数为所设定线程数量nThread,corePoolSize为nThread,maximumPoolSize为nThread;keepAliveTime为0L(不限时);unit为:TimeUnit.MILLISECONDS;WorkQueue为:new LinkedBlockingQueue<Runnable>() 无解阻塞队列

newFixedThreadPool工厂方法将线程池的基本大小和最大大小设置为参数中指定的值,创建的线程池不会超时(keepAliveTime为0L)。

通俗:创建可容纳固定数量线程的池子,每隔线程的存活时间是无限的,当池子满了就不在添加线程了;如果池中的所有线程均在繁忙状态,对于新任务会进入阻塞队列中(无界的阻塞队列)

适用:执行长期的任务,性能好很多

public static ExecutorService newFixedThreadPool(int nThreads) {

     return new ThreadPoolExecutor(nThreads, nThreads,

          0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());}

8.4.3 newSingleThreadExecutor() 

创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。

底层:FinalizableDelegatedExecutorService包装的ThreadPoolExecutor实例,corePoolSize为1;maximumPoolSize为1;keepAliveTime为0L;unit为:TimeUnit.MILLISECONDS;workQueue为:new LinkedBlockingQueue<Runnable>() 无解阻塞队列

通俗:创建只有一个线程的线程池,且线程的存活时间是无限的;当该线程正繁忙时,对于新任务会进入阻塞队列中(无界的阻塞队列)

适用:一个任务一个任务执行的场景

public static ExecutorService newSingleThreadExecutor() {

        return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,

                  0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));} 

8.4.4 newScheduledThreadPool()

创建一个定长线程池,支持定时及周期性任务执行。

底层:创建ScheduledThreadPoolExecutor实例,corePoolSize为传递来的参数,maximumPoolSize为Integer.MAX_VALUE;keepAliveTime为0;unit为:TimeUnit.NANOSECONDS;workQueue为:new DelayedWorkQueue() 一个按超时时间升序排序的队列

通俗:创建一个固定大小的线程池,线程池内线程存活时间无限制,线程池可以支持定时及周期性任务执行,如果所有线程均处于繁忙状态,对于新任务会进入elayedWorkQueue队列中,这是一种按照超时时间排序的队列结构

适用:周期性执行任务的场景

public static ScheduledExecutorService newScheduledThreadPool(

            int corePoolSize, ThreadFactory threadFactory){

return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);} 

返回的是接口

 

8.5 线程工厂

如果你希望修改线程的优先级(这通常并不是一个好主意。请参见10.3.1节)或者守护状态(同样,这也不是一个好主意。请参见7.4.2节)。或许你只是希望给线程取一个更有意义的名称,用来解释线程的转储信息和错误日志。都需要用到定制的线程工厂。例如:

public class MyThreadFactory implements ThreadFactory {

private final String poolName;

public MyThreadFactory(String poolName){

this.poolName=poolName;

}

public Thread newThread(Runnable runnable){

return new MyAppThread(runnable,poolName);

}

}

 

如果在应用程序中需要利用安全策略来控制对某些特殊代码库的访问权限,可以通过Executor中的privilegedThreadFactory工厂来定制自己的线程工厂。通过这种方式创建出来的线程,将与创建privilegedThreadFactory的线程拥有相同的访问权限、AccessControlContext和contextClassLoader。如果不使用privilegedThreadFactory,线程池创建的线程将从在需要新线程时调用execute或submit的客户程序中继承访问权限,从而导致令人困惑的安全性异常。

 

8.6 饱和拒绝策略

当有界队列被填满后,饱和策略开始发挥作用。ThreadPoolExecutor的饱和策略可以通过调用setRejectedExecutionHandler来修改。

8.6.1 AbortPolicy

中止策略,默认的策略。抛出未检查的RejectedExecutionException。调用者可以捕获这个异常,然后根据需求编写自己的处理代码。

8.6.2 Ca1lerRunsPolicy

"调用者运行"。一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量。当服务器过载时,这种过载情况会逐渐向外蔓延开来—从线程池到工作队列到应用程序再到TCP层,最终达到客户端。实现一种平缓的性能降低。

8.6.3 DiscardPolicy

抛弃策略。悄悄抛弃该任务,不抛出异常

8.6.4 DiscardOldestPolicy

"抛弃最旧的"。抛弃下一个将被执行的任务(队列最前面的任务),然后尝试重新提交新的任务。如果是一个优先队列,那么将抛弃优先级最高的任务,因此最好不要将"抛弃最旧的"饱和策略和优先级队列放在一起使用。

 

8.7 创建线程池的几种方式

1

如果任务之间存在依赖性,那么有界的线程池或队列就可能导致线程"饥饿"死锁问题。此时应该使用无界的线程池,例如newCachedThreadPool。

2

对于提交其他任务并等待其结果的任务来说,还有另一种配置方法,就是使用有界的线程池,并使用SynchronousQueue作为工作队列,以及"调用者运行(Caller-Runs )"饱和策略。

 

3

开发人员以免有时会将线程池的基本大小设置为零,从而最终销毁工作者线程以免阻碍JVM的退出。然而,如果在线程池中没有使用SynchronousQueue作为其工作队列(例如在newCachedThreadPool中就是如此),那么这种方式将产生一些奇怪的行为。如果线程池中的线程数量等于线程池的基本大小,那么仅当在工作队列已满的情况下ThreadPoolExecutor才会创建新的线程。因此,如果线程池的基本大小为零并且其工作队列有一定的容量,那么当把任务提交给该线程池时,只有当线程池的工作队列被填满后,才会开始执行任务,而这种行为通常井不是我们所希望的。在Java 6中,可以通过allowCorel'hreadTimeOut来使线程池中的所有线程超时。对于一个大小有限的线程池并且在该线程池中包含一个工作队列,如果希望这个线程池在没有任务的情况下能销毁所有线程,那么可以启用这个特性并将基本大小设置为零。

8.8 递归算法并行化

如果循环中的迭代操作都是独立的,并且不需要等待所有的迭代操作都完成再继续执行,那么就可以使用Executor将串行循环转化为并行循环。

 

 

 

 

 

转载于:https://www.cnblogs.com/domi22/p/8538241.html

你可能感兴趣的文章
利用AOP写2PC框架(二)
查看>>
【动态规划】skiing
查看>>
java定时器的使用(Timer)
查看>>
Android实现静默安装与卸载
查看>>
ef codefirst VS里修改数据表结构后更新到数据库
查看>>
boost 同步定时器
查看>>
[ROS] Chinese MOOC || Chapter-4.4 Action
查看>>
简单的数据库操作
查看>>
解决php -v查看到版本与phpinfo()版本不一致问题
查看>>
iOS-解决iOS8及以上设置applicationIconBadgeNumber报错的问题
查看>>
亡灵序曲-The Dawn
查看>>
Redmine
查看>>
帧的最小长度 CSMA/CD
查看>>
xib文件加载后设置frame无效问题
查看>>
编程算法 - 左旋转字符串 代码(C)
查看>>
IOS解析XML
查看>>
Python3多线程爬取meizitu的图片
查看>>
树状数组及其他特别简单的扩展
查看>>
zookeeper适用场景:分布式锁实现
查看>>
110104_LC-Display(液晶显示屏)
查看>>