博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
一起学并发编程 - 简易线程池实现
阅读量:6936 次
发布时间:2019-06-27

本文共 10742 字,大约阅读时间需要 35 分钟。

开发中经常会遇到各种池(如:连接池,线程池),它们的作用就是为了提高性能及减少开销,在JDK1.5以后的java.util.concurrent包中内置了很多不同使用场景的线程池,为了更好的理解它们,自己手写一个线程池,加深印象。

<!-- more -->

概述

1.什么是池

它的基本思想是一种对象池,程序初始化的时候开辟一块内存空间,里面存放若干个线程对象,池中线程执行调度由池管理器来处理。当有线程任务时,从池中取一个,执行完成后线程对象归池,这样可以避免反复创建线程对象所带来的性能开销,节省系统的资源。

2.使用线程池的好处

合理的使用线程池可以重复利用已创建的线程,这样就可以减少在创建线程和销毁线程上花费的时间和资源。并且,线程池在某些情况下还能动态调整工作线程的数量,以平衡资源消耗和工作效率。同时线程池还提供了对池中工作线程进行统一的管理的相关方法。这样就相当于我们一次创建,就可以多次使用,大量的节省了系统频繁的创建和销毁线程所需要的资源。

简易版实现

简易版

包含功能:

1.创建线程池,销毁线程池,添加新任务

2.没有任务进入等待,有任务则处理掉

3.动态伸缩,扩容

4.拒绝策略

介绍了线程池的原理以及主要组件之后,就让我们来手动实现一个自己的线程池,以加深理解和深入学习。因为自己实现的简易版本所以不建议生产中使用,生产中使用java.util.concurrent会更加健壮和优雅(后续文章会介绍)

代码

以下线程池相关代码均在SimpleThreadPoolExecutor.java中,由于为了便于解读因此以代码块的形式呈现

维护一个内部枚举类,用来标记当前任务线程状态,在Thread中其实也有.

private enum TaskState {    FREE, RUNNABLE, BLOCKED, TERMINATED;}

定义拒绝策略接口,以及默认实现

static class DiscardException extends RuntimeException {    private static final long serialVersionUID = 8827362380544575914L;    DiscardException(String message) {        super(message);    }}interface DiscardPolicy {//拒绝策略接口    void discard() throws DiscardException;}

任务线程具体实现

1.继承Thread,重写run方法。

2.this.taskState == TaskState.FREE && TASK_QUEUE.isEmpty() 如果当前线程处于空闲状态且没有任何任务了就将它wait住,让出CPU执行权

3.如果有任务就去执行FIFO(先进先出)策略

4.定义close方法,关闭线程,当然这里不能暴力关闭,所以这里有需要借助interrupt

public static class WorkerTask extends Thread {    // 线程状态    private TaskState taskState;    // 线程编号    private static int threadInitNumber;    /**     * 生成线程名,参考Thread.nextThreadNum();     *     * @return     */    private static synchronized String nextThreadName() {        return THREAD_NAME_PREFIX + (++threadInitNumber);    }    WorkerTask() {        super(THREAD_GROUP, nextThreadName());    }    @Override    public void run() {        Runnable target;        //说明该线程处于空闲状态        OUTER:        while (this.taskState != TaskState.TERMINATED) {            synchronized (TASK_QUEUE) {                while (this.taskState == TaskState.FREE && TASK_QUEUE.isEmpty()) {                    try {                        this.taskState = TaskState.BLOCKED;//此处标记                        //没有任务就wait住,让出CPU执行权                        TASK_QUEUE.wait();                        //如果被打断说明当前线程执行了 shutdown() 方法  线程状态为 TERMINATED 直接跳到 while 便于退出                    } catch (InterruptedException e) {                        break OUTER;                    }                }                target = TASK_QUEUE.removeFirst();//遵循FIFO策略            }            if (target != null) {                this.taskState = TaskState.RUNNABLE;                target.run();//开始任务了                this.taskState = TaskState.FREE;            }        }    }    void close() {//优雅关闭线程        this.taskState = TaskState.TERMINATED;        this.interrupt();    }}

简易版线程池,主要就是维护了一个任务队列线程集,为了动态扩容,自己也继承了Thread去做监听操作,对外提供submit()提交执行任务shutdown()等待所有任务工作完毕,关闭线程池

public class SimpleThreadPoolExecutor extends Thread {    // 线程池大小    private int threadPoolSize;    // 最大接收任务    private int queueSize;    // 拒绝策略    private DiscardPolicy discardPolicy;    // 是否被销毁    private volatile boolean destroy = false;    private final static int DEFAULT_MIN_THREAD_SIZE = 2;// 默认最小线程数    private final static int DEFAULT_ACTIVE_THREAD_SIZE = 5;// 活跃线程    private final static int DEFAULT_MAX_THREAD_SIZE = 10;// 最大线程    private final static int DEFAULT_WORKER_QUEUE_SIZE = 100;// 最多执行多少任务    private final static String THREAD_NAME_PREFIX = "MY-THREAD-NAME-";//线程名前缀    private final static String THREAD_POOL_NAME = "SIMPLE-POOL";//线程组的名称    private final static ThreadGroup THREAD_GROUP = new ThreadGroup(THREAD_POOL_NAME);//线程组    private final static List
WORKER_TASKS = new ArrayList<>();// 线程容器 // 任务队列容器,也可以用Queue
遵循 FIFO 规则 private final static LinkedList
TASK_QUEUE = new LinkedList<>(); // 拒绝策略 private final static DiscardPolicy DEFAULT_DISCARD_POLICY = () -> { throw new DiscardException("[拒绝执行] - [任务队列溢出...]"); }; private int minSize;//最小线程 private int maxSize;//最大线程 private int activeSize;//活跃线程 SimpleThreadPoolExecutor() { this(DEFAULT_MIN_THREAD_SIZE, DEFAULT_ACTIVE_THREAD_SIZE, DEFAULT_MAX_THREAD_SIZE, DEFAULT_WORKER_QUEUE_SIZE, DEFAULT_DISCARD_POLICY); } SimpleThreadPoolExecutor(int minSize, int activeSize, int maxSize, int queueSize, DiscardPolicy discardPolicy){ this.minSize = minSize; this.activeSize = activeSize; this.maxSize = maxSize; this.queueSize = queueSize; this.discardPolicy = discardPolicy; initPool(); } void submit(Runnable runnable) { if (destroy) { throw new IllegalStateException("线程池已销毁..."); } synchronized (TASK_QUEUE) { if (TASK_QUEUE.size() > queueSize) {//如果当前任务队超出队列限制,后续任务拒绝执行 discardPolicy.discard(); } // 1.将任务添加到队列 TASK_QUEUE.addLast(runnable); // 2.唤醒等待的线程去执行任务 TASK_QUEUE.notifyAll(); } } void shutdown() throws InterruptedException { int activeCount = THREAD_GROUP.activeCount(); while (!TASK_QUEUE.isEmpty() && activeCount > 0) { // 如果还有任务,那就休息一会 Thread.sleep(100); } int intVal = WORKER_TASKS.size();//如果线程池中没有线程,那就不用关了 while (intVal > 0) { for (WorkerTask task : WORKER_TASKS) { //当任务队列为空的时候,线程状态才会为 BLOCKED ,所以可以打断掉,相反等任务执行完在关闭 if (task.taskState == TaskState.BLOCKED) { task.close(); intVal--; } else { Thread.sleep(50); } } } this.destroy = true; //资源回收 TASK_QUEUE.clear(); WORKER_TASKS.clear(); this.interrupt(); System.out.println("线程关闭"); } private void createWorkerTask() { WorkerTask task = new WorkerTask(); //刚创建出来的线程应该是未使用的 task.taskState = TaskState.FREE; WORKER_TASKS.add(task); task.start(); } /** * 初始化操作 */ private void initPool() { for (int i = 0; i < this.minSize; i++) { this.createWorkerTask(); } this.threadPoolSize = minSize; this.start();//自己启动自己 } @Override public void run() { while (!destroy) { try { Thread.sleep(5_000L); if (TASK_QUEUE.size() > activeSize && threadPoolSize < activeSize) { // 第一次扩容到 activeSize 大小 for (int i = threadPoolSize; i < activeSize; i++) { createWorkerTask(); } this.threadPoolSize = activeSize; System.out.println("[初次扩充] - [" + toString() + "]"); } else if (TASK_QUEUE.size() > maxSize && threadPoolSize < maxSize) {// 第二次扩容到最大线程 System.out.println(); for (int i = threadPoolSize; i < maxSize; i++) { createWorkerTask(); } this.threadPoolSize = maxSize; System.out.println("[再次扩充] - [" + toString() + "]"); } else { //防止线程在submit的时候,其他线程获取到锁干坏事 synchronized (WORKER_TASKS) { int releaseSize = threadPoolSize - activeSize; Iterator
iterator = WORKER_TASKS.iterator();// List不允许在for中删除集合元素,所以这里需要使用迭代器 while (iterator.hasNext()) { if (releaseSize <= 0) { break; } WorkerTask task = iterator.next(); //不能回收正在运行的线程,只回收空闲线程 if (task.taskState == TaskState.FREE) { task.close(); iterator.remove(); releaseSize--; } } System.out.println("[资源回收] - [" + toString() + "]"); } threadPoolSize = activeSize; } } catch (InterruptedException e) { System.out.println("资源释放"); } } } @Override public String toString() { return "SimpleThreadPoolExecutor{" + "threadPoolSize=" + threadPoolSize + ", taskQueueSize=" + TASK_QUEUE.size() + ", minSize=" + minSize + ", maxSize=" + maxSize + ", activeSize=" + activeSize + '}'; }}

测试一把

创建一个测试类

public class SimpleExecutorTest {    public static void main(String[] args) throws InterruptedException {        SimpleThreadPoolExecutor executor = new SimpleThreadPoolExecutor();        IntStream.range(0, 30).forEach(i ->                executor.submit(() -> {                    System.out.printf("[线程] - [%s] 开始工作...\n", Thread.currentThread().getName());                    try {                        Thread.sleep(2_000L);                    } catch (InterruptedException e) {                        e.printStackTrace();                    }                    System.out.printf("[线程] - [%s] 工作完毕...\n", Thread.currentThread().getName());                })        );        //executor.shutdown();如果放开注释即会执行完所有任务关闭线程池    }}

日志分析: 从日志中可以看到,初始化的时候是2个线程在工作,执行速度较为缓慢,当经过第一次扩容后,会观察到线程池里线程个数增加了,执行任务的速度就越来越快了,本文一共扩容了2次,第一次是扩容到activeSize的大小,第二次是扩容到maxSize,在执行任务的过程中,当线程数过多的时候就会触发回收机制...

[线程] - [MY-THREAD-NAME-1] 开始工作...[线程] - [MY-THREAD-NAME-2] 开始工作...[线程] - [MY-THREAD-NAME-1] 工作完毕...[线程] - [MY-THREAD-NAME-1] 开始工作...[线程] - [MY-THREAD-NAME-2] 工作完毕...[线程] - [MY-THREAD-NAME-2] 开始工作...[线程] - [MY-THREAD-NAME-1] 工作完毕...[线程] - [MY-THREAD-NAME-1] 开始工作...[线程] - [MY-THREAD-NAME-2] 工作完毕...[线程] - [MY-THREAD-NAME-2] 开始工作...[初次扩充] - [SimpleThreadPoolExecutor{threadPoolSize=5, taskQueueSize=44, minSize=2, maxSize=10, activeSize=5}][线程] - [MY-THREAD-NAME-3] 开始工作......[线程] - [MY-THREAD-NAME-6] 开始工作...[线程] - [MY-THREAD-NAME-7] 开始工作...[再次扩充] - [SimpleThreadPoolExecutor{threadPoolSize=10, taskQueueSize=30, minSize=2, maxSize=10, activeSize=5}][线程] - [MY-THREAD-NAME-10] 开始工作......[线程] - [MY-THREAD-NAME-5] 开始工作...[资源回收] - [SimpleThreadPoolExecutor{threadPoolSize=10, taskQueueSize=4, minSize=2, maxSize=10, activeSize=5}][线程] - [MY-THREAD-NAME-4] 工作完毕......[线程] - [MY-THREAD-NAME-7] 工作完毕...[资源回收] - [SimpleThreadPoolExecutor{threadPoolSize=5, taskQueueSize=0, minSize=2, maxSize=10, activeSize=5}][资源回收] - [SimpleThreadPoolExecutor{threadPoolSize=5, taskQueueSize=0, minSize=2, maxSize=10, activeSize=5}]

总结

通过本文,大致可以了解线程池的工作原理和实现方式,学习的过程中,就是要知其然知其所以然。这样才能更好地驾驭它,更好地去理解和使用,也能更好地帮助我们触类旁通,后面的文章中会详细介绍java.util.concurrent中的线程池

- 说点什么

全文代码:

  • 个人QQ:1837307557
  • battcn开源群(适合新手):391619659

微信公众号:battcn(欢迎调戏)

转载地址:http://mcbnl.baihongyu.com/

你可能感兴趣的文章
java 运算顺序
查看>>
天涯LVS部署
查看>>
eclipse不能自动编译工程的解决方法
查看>>
最好用的cisco路由模拟器 debianIOL
查看>>
Shpinx在PHPCMS里的使用及配置
查看>>
Linux Oracle Rac 10G 搭建& Patch
查看>>
django models.py模块的外部引用
查看>>
VMware虚拟化技术培训(8) 虚拟机管理之二
查看>>
spring内部各模块jar包依赖
查看>>
Apache与Nginx网络模型对比
查看>>
Java 二重循环实现对象去重
查看>>
Supporting Python 3(支持python3)——序
查看>>
从零开始-打造自己的虚拟实验室-2
查看>>
js 完美兼容浏览器的复制功能
查看>>
jdk1.6下使用sardine和jackrabbit-webdav的问题
查看>>
[Unity3d]socket通信 切换到web版本时报错SecurityException解决办法
查看>>
[Unity3D插件]2dtoolkit系列二 动画精灵的创建以及背景图的无限滚动
查看>>
谈谈spring中bean的名字
查看>>
Vue Element表单绑定(二)表单验证1
查看>>
Unix sed笔记
查看>>