Posts /

线程池

16 Aug 2020

线程池


参考:

线程池的种类,区别和使用场景

Java线程池类型介绍

Java并发编程:线程池的使用

为什么要用线程池

当我们需要的并发执行线程数量很多时,且每个线程执行很短的时间就结束了,这样,我们频繁的创建、销毁线程就大大降低了工作效率(创建和销毁线程需要时间、资源)。 java中的线程池可以达到这样的效果:一个线程执行完任务之后,继续去执行下一个任务,不被销毁,这样线程利用率提高了。

线程池种类

参考:

4种常用线程池介绍

1. newCachedThreadPool:

​ 可缓存线程池。

流程:

​ 先查看池中有没有以前建立的线程,如果有则直接使用。如果没有则新建一个线程加入线程池。

​ 他根据需要创建线程,corePoolSize为0,当60s内没有任务时(可以通过自己创建线程池来设置该时间),将会回收存活的线程,60s内有任务时,他可以重用已有的线程。注意他的工作队列是SynchronousQueue,他的每一个put操作必须等待take操作,这意味着如果任务生产速度大于消费速度,那么他将不会创建新线程。该线程池适合执行大量小任务的场景。

可缓存线程池实现

1597546156736

可缓存线程池实践 1

可缓存线程池的复用情况。

package javatest.threadpool;

import java.util.Date;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * @description: 缓存线程池测试
 * @modifyContent:
 * @author: Maple Chan
 * @date: 2020-07-28 21:21:29
 * @version: 0.0.1
 */
public class NewCachedTest {
    static long startTime;

    public static void main(String[] args) {

        /**
         * 手动创建线程池,效果会更好哦。 (rule: AlibabaJavaConcurrent-ThreadPoolCreationRule)Java P3C
         * CheckerThreadPoolCreationRule
         */

        ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
        Executors.newSingleThreadExecutor();
        for (int i = 0; i < 10; i++) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            cachedThreadPool.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName() + "-正在执行...");
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
        System.out.println("New cached thread test finished!");
    }
}
// 输出:
/*
pool-1-thread-1-正在执行...
pool-1-thread-2-正在执行...
pool-1-thread-1-正在执行...
pool-1-thread-2-正在执行...
pool-1-thread-1-正在执行...
pool-1-thread-2-正在执行...
pool-1-thread-2-正在执行...
pool-1-thread-1-正在执行...
pool-1-thread-2-正在执行...
New cached thread test finished!
pool-1-thread-1-正在执行...
// 此时程序还没退出,因为线程池还没有terminated
*/

通过上述代码测试结果发现,该线程池一共只启动两个线程,pool-1-thread-1pool-1-thread-2

可缓存线程池为无限大,当执行当前任务时上一个任务已经完成,会复用执行上一个任务的线程,而不用每次新建线程。

可缓存线程池实践 2

可以发现之前可缓存线程池测试结束之后,整个main程序还没有退出,估计是和缓存线程池还没有退出的原因。下面进一步实践,探究该线程池。

    public static void main(String[] args) {
        ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
        for (int i = 0; i < 10; i++) {

            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            cachedThreadPool.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName() + "-正在执行...");
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
            });
        }
        System.out.println("New cached thread test finished!");
        startTime = System.currentTimeMillis();
        long endTime = 0;
        while (cachedThreadPool.isShutdown()) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            System.out.println("cachedThreadPool is not shutdown!");
        }
        while (cachedThreadPool.isTerminated()) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            System.out.println("cachedThreadPool is not terminated!");
        }
        endTime = System.currentTimeMillis();
        endTime = endTime - startTime;
        System.out.println("上个线程执行完的时间到程序退出时间:" + endTime + " ms");
    }

// 输出
/*
pool-1-thread-1-正在执行...
pool-1-thread-2-正在执行...
pool-1-thread-1-正在执行...
pool-1-thread-2-正在执行...
pool-1-thread-1-正在执行...
pool-1-thread-2-正在执行...
pool-1-thread-1-正在执行...
pool-1-thread-2-正在执行...
pool-1-thread-1-正在执行...
New cached thread test finished!
pool-1-thread-2-正在执行...
上个线程执行完的时间到程序退出时间:0 ms
*/

通过上述代码测试发现,缓存线程在执行完线程之后并没有直接退出。while (cachedThreadPool.isShutdown())while (cachedThreadPool.isTerminated()) 两个循环都没有进入。通过手动计时,线程执行结束到整个程序退出的时间正好是60L, TimeUnit.SECONDS, 60秒。

Java中没法在finalize中获取currentTimeMillis,从而获得线程池退出需要用的时间。

等想到办法用代码计时之后再回来补充。

可缓存线程池实践 3

上个实践中提到,线程退出时间是60秒,这个时间有办法调整么?带着这个问题,我看了newCachedThreadPool 调用的源码。

1597547961589

通过这个源码,实现自己的可缓存线程池,修改60s这个参数。

// 替换上面代码实例化线程池的方式
//ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
ThreadPoolExecutor cachedThreadPool = 
    new ThreadPoolExecutor(0, Integer.MAX_VALUE,10L, 
                           TimeUnit.SECONDS,
                           new SynchronousQueue<Runnable>());
/*
pool-1-thread-1-正在执行...
pool-1-thread-2-正在执行...
pool-1-thread-1-正在执行...
pool-1-thread-2-正在执行...
pool-1-thread-2-正在执行...
pool-1-thread-2-正在执行...
pool-1-thread-1-正在执行...
pool-1-thread-2-正在执行...
pool-1-thread-1-正在执行...
New cached thread test finished!
pool-1-thread-2-正在执行...
上个线程执行完的时间到程序退出时间:0 ms
*/

输出结果一样,但是通过手动计时,线程执行结束到整个程序退出的时间差不多10L, TimeUnit.SECONDS, 60秒。

同时,如果采用Executors实例化线程池的,并且使用ali的代码规范的话会提示:

手动创建线程池,效果会更好哦。 (rule: AlibabaJavaConcurrent-ThreadPoolCreationRule)Java P3C

因此,可以多尝试自定义的方式实例化线程池。

2. newFixedThreadPool:

​ 线程的数量是固定的,线程并不会随着任务的多少而变化。Executors.newFixedThreadPool(nThreads);传入参数nThreads,固定线程池的线程个数,包括核心线程数、最大线程数。当线程池满了,就会放入阻塞队列。

流程:

​ FixedThreadPool中最多只有固定数目线程存在,一个线程实例请求加入FixedThreadPool时,如果该实例不存在,且没有达到线程池数目上线,则会创建一个实例,否则,会先加入等待序列,当FixedThreadPool中有一个线程停止并移出线程池后,线程实例才能加入线程池。

​ FixedThreadPool没有超时机制,适用于稳定且并发线程任务。

固定线程池实现

1597548944599

固定线程池实践

package javatest.threadpool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class FixedThreadPoolTest {

    public static void main(String[] args) {
        ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
        System.out.println("实例化固定线程池");
        for (int i = 0; i < 10; i++) {
            fixedThreadPool.execute(new Runnable() {
                public void run() {
                    try {
                        // 打印正在执行的缓存线程信息
                        System.out.println(Thread.currentThread().getName() + "正在被执行");
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
        System.out.println("线程任务分配完毕!");
        fixedThreadPool.shutdown();
    }
}
//输出
/*
实例化固定线程池
线程任务分配完毕!
pool-1-thread-3正在被执行
pool-1-thread-2正在被执行
pool-1-thread-1正在被执行
pool-1-thread-3正在被执行
pool-1-thread-2正在被执行
pool-1-thread-1正在被执行
pool-1-thread-3正在被执行
pool-1-thread-2正在被执行
pool-1-thread-1正在被执行
pool-1-thread-2正在被执行
*/

通过上述代码可以发现,线程池中可以调用的有三个线程。本例中的循环没有sleep等待,很快就执行完毕。把线程任务提交给了线程池,但是依旧是三个三个完成整个线程池中的任务。

同时发现,如果不调用fixedThreadPool.shutdown();则程序一直不会退出,而newCachedThreadPool在超过keepAliveTime之后Main方法还是能退出的,也就是线程池关闭。

官方说明

1597551474438

引用:

JAVA线程池原理源码解析—为什么启动一个线程池,提交一个任务后,Main方法不会退出?

1.线程池的创建的时候,第一次submit操作会创建Worker线程(负责去拿任务处理),该线程里写了一个死循环,所以这个Worker线程不会死。 2.Worker线程在创建的时候,被设置成了非守护线程,thread.setDaemon(false)。 3.早在JDK1.5的时候,就规定了当所有非守护线程退出时,JVM才会退出,Main方法主线程和Worker线程都是非守护线程,所以不会死。

3. newSingleThreadExecutor:

​ 只有一个线程的线程池。

​ 核心线程=最大线程=1,创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。这个比较适合需要保证队列中任务顺序执行的场景。

单一线程池实现

1597557115501

单一线程池实践

这个实践中,我们把单线程的休眠时间拉长,保证10个线程都被实例化之后,第一个线程还没执行完。

package javatest.threadpool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class SingleThreadPoolTest {
    public static void main(String[] args) {
        ExecutorService singleExecutorService = Executors.newSingleThreadExecutor();
        for (int i = 0; i < 10; i++) {
            final int index = i;
            System.out.println("实例化第" + i + "个线程");
            singleExecutorService.execute(new Runnable() {
                @Override
                public void run() {
                    // TODO Auto-generated method stub
                    System.out.println(Thread.currentThread().getName() + "正在被执行,打印的值是:" + index);
                    try {
                        Thread.sleep(5000);
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
            });
        }
    }
}

//输出
/*
实例化第0个线程
实例化第1个线程
实例化第2个线程
pool-1-thread-1正在被执行,打印的值是:0
实例化第3个线程
实例化第4个线程
实例化第5个线程
实例化第6个线程
实例化第7个线程
实例化第8个线程
实例化第9个线程
pool-1-thread-1正在被执行,打印的值是:1
pool-1-thread-1正在被执行,打印的值是:2
pool-1-thread-1正在被执行,打印的值是:3
pool-1-thread-1正在被执行,打印的值是:4
pool-1-thread-1正在被执行,打印的值是:5
pool-1-thread-1正在被执行,打印的值是:6
pool-1-thread-1正在被执行,打印的值是:7
pool-1-thread-1正在被执行,打印的值是:8
pool-1-thread-1正在被执行,打印的值是:9

*/

通过上述代码可以发现,我们执行的线程都是按照实例化的顺序加入到无界阻塞队列当中,并且也是按照实例化的顺序执行的。

4. newScheduledThreadPool:

​ 延时线程池。ScheduledThreadPoolExecutor继承了ThreadPoolExecutor,所以super最终会调到ThreadPoolExecutor的构造函数,可以看到,最大线程数为int最大值,工作队列为延时队列DelayedWorkQueue,该线程池适合执行延时任务。

参考:

深入理解Java线程池:ScheduledThreadPoolExecutor

延时线程池实现

类的关系图如下:

1597559492056

ScheduledExecutorService调用newScheduleThreadPool返回ScheduleThreadExecutord实例。

1597558719375

ScheduleThreadPoolExecutor继承了ThreadPoolExecutor类,调用是传入的参数也相同。核心线程数,最大线程数,线程超时时间,单位,阻塞队列。

实现如下:

1597558734472

延时线程池实践1

public class ScheduledThreadPoolTest {
    public static void main(String[] args) {
        int corePoolSize = 4;
        // 创建一个定长线程池,支持定时及周期性任务执行——延迟执行
        ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(corePoolSize);
        // 延迟1秒执行
        // scheduledThreadPool.schedule(new Runnable() {
        // public void run() {
        // System.out.println("延迟1秒执行");
        // }
        // }, 1, TimeUnit.SECONDS);

        // 延迟1秒后每3秒执行一次
        scheduledThreadPool.scheduleAtFixedRate(new Runnable() {
            public void run() {
                System.out.println("延迟1秒后每3秒执行一次");
            }
        }, 1, 3, TimeUnit.SECONDS);

    }
}
//输出,每三秒执行输出一次。
/*
延迟1秒后每3秒执行一次
延迟1秒后每3秒执行一次
延迟1秒后每3秒执行一次
延迟1秒后每3秒执行一次
...
...
*/

延时线程池实践2

本例除了使用延时线程池外,还展示了如何正确、优雅的关闭线程线程池(shutdown + awaitTermination)。

package javatest.threadpool;
import java.util.Date;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class ScheduledThreadPoolTest {

    public static void main(String[] args) throws InterruptedException {
        ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(4);
        for (int i = 0; i < 4; i++) {
            scheduledThreadPool.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    // TODO Auto-generated method stub
                    System.out.println(Thread.currentThread().getName() + "Start running! Time:" + new Date());
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                    System.out.println(Thread.currentThread().getName() + "End running! Time:" + new Date());
                }

            }, 1, 5, TimeUnit.SECONDS);
        }
        Thread.sleep(10000);
        System.out.println("Shutting down executor...");
        // 关闭线程池
        scheduledThreadPool.shutdown();
        boolean isDone = false;
        // 等待线程池终止
        do {
            isDone = scheduledThreadPool.awaitTermination(1, TimeUnit.DAYS);
            System.out.println("awaitTermination...");
        } while (!isDone);
        System.out.println("Finished all threads");
    }
}


/*
pool-1-thread-1Start running! Time:Sun Aug 16 14:47:49 CST 2020
pool-1-thread-2Start running! Time:Sun Aug 16 14:47:49 CST 2020
pool-1-thread-4Start running! Time:Sun Aug 16 14:47:49 CST 2020
pool-1-thread-3Start running! Time:Sun Aug 16 14:47:49 CST 2020
pool-1-thread-4End running! Time:Sun Aug 16 14:47:50 CST 2020
pool-1-thread-3End running! Time:Sun Aug 16 14:47:50 CST 2020
pool-1-thread-2End running! Time:Sun Aug 16 14:47:50 CST 2020
pool-1-thread-1End running! Time:Sun Aug 16 14:47:50 CST 2020
pool-1-thread-1Start running! Time:Sun Aug 16 14:47:54 CST 2020
pool-1-thread-4Start running! Time:Sun Aug 16 14:47:54 CST 2020
pool-1-thread-3Start running! Time:Sun Aug 16 14:47:54 CST 2020
pool-1-thread-2Start running! Time:Sun Aug 16 14:47:54 CST 2020
pool-1-thread-1End running! Time:Sun Aug 16 14:47:55 CST 2020
pool-1-thread-3End running! Time:Sun Aug 16 14:47:55 CST 2020
pool-1-thread-4End running! Time:Sun Aug 16 14:47:55 CST 2020
pool-1-thread-2End running! Time:Sun Aug 16 14:47:55 CST 2020
Shutting down executor...
awaitTermination...
Finished all threads
*/

5. newWorkStealingPool:

1.8新加的线程池,forkJoinPool 可以根据CPU的核数并行的执行,适合使用在很耗时的操作,可以充分的利用CPU执行任务,任务窃取线程池,不保证执行顺序,适合任务耗时差异较大。

线程池使用

线程池,这一篇或许就够了

Java线程池的正确关闭方法,awaitTermination还不够

ThreadPoolExecutor 中的 shutdown() 、awaitTermination() 、 shutdownNow() 的用法

awaitTermination使用:

传入时间和单位,在这个时间结束之前,将会阻塞直到所有任务完成。正常完成返回true

如果超过设置的时间,则结束阻塞,返回false

如果阻塞被中断,则抛出中断异常。

1597560605066

线程名字

线程都有自己的默认名称,也可以自己设置。

Thread-编号(默认从0开始)。

Sleep方法

sleep方法,暂停当前线程,把cpu片段让出给其他线程,减缓当前线程的执行。

但是如果当前线程占有了锁,线程不会释放锁。

sleep过后,不是直接到了运行状态,而是到就绪(可运行)状态。