服务器之家:专注于VPS、云服务器配置技术及软件下载分享
分类导航

PHP教程|ASP.NET教程|Java教程|ASP教程|编程技术|正则表达式|C/C++|IOS|C#|Swift|Android|VB|R语言|JavaScript|易语言|vb.net|

服务器之家 - 编程语言 - Java教程 - 学生视角手把手带你写Java 线程池改良版

学生视角手把手带你写Java 线程池改良版

2022-10-12 12:53摸鱼打酱油 Java教程

作者是一个来自河源的大三在校生,以下笔记都是作者自学之路的一些浅薄经验,如有错误请指正,将来会不断的完善笔记,帮助更多的Java爱好者入门

Java手写线程池(第二代)

第二代线程池的优化

1:新增了4种拒绝策略。分别为:MyAbortPolicy、MyDiscardPolicy、MyDiscardOldestPolicy、MyCallerRunsPolicy

2:对线程池MyThreadPoolExecutor的构造方法进行优化,增加了参数校验,防止乱传参数现象。

3:这是最重要的一个优化。

  • 移除线程池的线程预热功能。因为线程预热会极大的耗费内存,当我们不用线程池时也会一直在运行状态。
  • 换来的是在调用execute方法添加任务时通过检查workers线程集合目前的大小与corePoolSize的值去比较,再通过new MyWorker()去创建添加线程到线程池,这样好处就是当我们创建线程池如果不使用的话则对当前内存没有一点影响,当使用了才会创建线程并放入线程池中进行复用。

线程池构造器

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public MyThreadPoolExecutor(){
    this(5,new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(),defaultHandle);
}
public MyThreadPoolExecutor(int corePoolSize, BlockingQueue<Runnable> waitingQueue,ThreadFactory threadFactory) {
    this(corePoolSize,waitingQueue,threadFactory,defaultHandle);
}
public MyThreadPoolExecutor(int corePoolSize, BlockingQueue<Runnable> waitingQueue,ThreadFactory threadFactory,MyRejectedExecutionHandle handle) {
    this.workers=new HashSet<>(corePoolSize);
    if(corePoolSize>=0&&waitingQueue!=null&&threadFactory!=null&&handle!=null){
        this.corePoolSize=corePoolSize;
        this.waitingQueue=waitingQueue;
        this.threadFactory=threadFactory;
        this.handle=handle;
    }else {
        throw new NullPointerException("线程池参数不合法");
    }
}

线程池拒绝策略

策略接口:MyRejectedExecutionHandle

?
1
2
3
4
5
6
7
8
9
10
11
package com.springframework.concurrent;
 
/**
 * 自定义拒绝策略
 * @author 游政杰
 */
public interface MyRejectedExecutionHandle {
 
    void rejectedExecution(Runnable runnable,MyThreadPoolExecutor threadPoolExecutor);
 
}

策略内部实现类

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
/**
     * 实现自定义拒绝策略
     */
    //抛异常策略(默认)
    public static class MyAbortPolicy implements MyRejectedExecutionHandle{
        public MyAbortPolicy(){
 
        }
        @Override
        public void rejectedExecution(Runnable r, MyThreadPoolExecutor t) {
            throw new MyRejectedExecutionException("任务-> "+r.toString()+"被线程池-> "+t.toString()+" 拒绝");
        }
    }
    //默默丢弃策略
    public static class MyDiscardPolicy implements MyRejectedExecutionHandle{
 
        public MyDiscardPolicy() {
        }
        @Override
        public void rejectedExecution(Runnable runnable, MyThreadPoolExecutor threadPoolExecutor) {
 
        }
    }
    //丢弃掉最老的任务策略
    public static class MyDiscardOldestPolicy implements MyRejectedExecutionHandle{
        public MyDiscardOldestPolicy() {
        }
        @Override
        public void rejectedExecution(Runnable runnable, MyThreadPoolExecutor threadPoolExecutor) {
            if(!threadPoolExecutor.isShutdown()){ //如果线程池没被关闭
                threadPoolExecutor.getWaitingQueue().poll();//丢掉最老的任务,此时就有位置当新任务了
                threadPoolExecutor.execute(runnable); //把新任务加入到队列中
            }
        }
    }
    //由调用者调用策略
    public static class MyCallerRunsPolicy implements MyRejectedExecutionHandle{
        public MyCallerRunsPolicy(){
 
        }
        @Override
        public void rejectedExecution(Runnable runnable, MyThreadPoolExecutor threadPoolExecutor) {
            if(!threadPoolExecutor.isShutdown()){//判断线程池是否被关闭
                runnable.run();
            }
        }
    }

封装拒绝方法

?
1
2
3
4
5
6
7
protected final void reject(Runnable runnable){
    this.handle.rejectedExecution(runnable, this);
}
 
protected final void reject(Runnable runnable,MyThreadPoolExecutor threadPoolExecutor){
    this.handle.rejectedExecution(runnable, threadPoolExecutor);
}

execute方法

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Override
public boolean execute(Runnable runnable)
{
    if (!this.waitingQueue.offer(runnable)) {
        this.reject(runnable);
        return false;
    }
    else {
        if(this.workers!=null&&this.workers.size()<corePoolSize){//这种情况才能添加线程
            MyWorker worker = new MyWorker(); //通过构造方法添加线程
        }
        return true;
    }
}

可以看出只有当往线程池放任务时才会创建线程对象。

手写线程池源码

MyExecutorService

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package com.springframework.concurrent;
 
import java.util.concurrent.BlockingQueue;
 
/**
 * 自定义线程池业务接口
 * @author 游政杰
 */
public interface MyExecutorService {
 
    boolean execute(Runnable runnable);
 
    void shutdown();
 
    void shutdownNow();
 
    boolean isShutdown();
 
    BlockingQueue<Runnable> getWaitingQueue();
 
}

MyRejectedExecutionException

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package com.springframework.concurrent;
 
/**
 * 自定义拒绝异常
 */
public class MyRejectedExecutionException extends RuntimeException {
 
    public MyRejectedExecutionException() {
    }
    public MyRejectedExecutionException(String message) {
        super(message);
    }
 
    public MyRejectedExecutionException(String message, Throwable cause) {
        super(message, cause);
    }
 
    public MyRejectedExecutionException(Throwable cause) {
        super(cause);
    }
 
}

MyRejectedExecutionHandle

?
1
2
3
4
5
6
7
8
9
10
11
package com.springframework.concurrent;
 
/**
 * 自定义拒绝策略
 * @author 游政杰
 */
public interface MyRejectedExecutionHandle {
 
    void rejectedExecution(Runnable runnable,MyThreadPoolExecutor threadPoolExecutor);
 
}

核心类MyThreadPoolExecutor

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
package com.springframework.concurrent;
 
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
 
/**
 * 纯手撸线程池框架
 * @author 游政杰
 */
public class MyThreadPoolExecutor implements MyExecutorService{
 
    private static final AtomicInteger taskcount=new AtomicInteger(0);//执行任务次数
    private static final AtomicInteger threadNumber=new AtomicInteger(0); //线程编号
    private static volatile int corePoolSize; //核心线程数
    private final HashSet<MyWorker> workers; //工作线程
    private final BlockingQueue<Runnable> waitingQueue; //等待队列
    private static final String THREADPOOL_NAME="MyThread-Pool-";//线程名称
    private volatile boolean isRunning=true; //是否运行
    private volatile boolean STOPNOW=false; //是否立刻停止
    private volatile ThreadFactory threadFactory; //线程工厂
    private static final MyRejectedExecutionHandle defaultHandle=new MyThreadPoolExecutor.MyAbortPolicy();//默认拒绝策略
    private volatile MyRejectedExecutionHandle handle; //拒绝紫略
 
    public MyThreadPoolExecutor(){
        this(5,new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(),defaultHandle);
    }
    public MyThreadPoolExecutor(int corePoolSize, BlockingQueue<Runnable> waitingQueue,ThreadFactory threadFactory) {
        this(corePoolSize,waitingQueue,threadFactory,defaultHandle);
    }
    public MyThreadPoolExecutor(int corePoolSize, BlockingQueue<Runnable> waitingQueue,ThreadFactory threadFactory,MyRejectedExecutionHandle handle) {
        this.workers=new HashSet<>(corePoolSize);
        if(corePoolSize>=0&&waitingQueue!=null&&threadFactory!=null&&handle!=null){
            this.corePoolSize=corePoolSize;
            this.waitingQueue=waitingQueue;
            this.threadFactory=threadFactory;
            this.handle=handle;
        }else {
            throw new NullPointerException("线程池参数不合法");
        }
    }
    /**
     * 实现自定义拒绝策略
     */
    //抛异常策略(默认)
    public static class MyAbortPolicy implements MyRejectedExecutionHandle{
        public MyAbortPolicy(){
 
        }
        @Override
        public void rejectedExecution(Runnable r, MyThreadPoolExecutor t) {
            throw new MyRejectedExecutionException("任务-> "+r.toString()+"被线程池-> "+t.toString()+" 拒绝");
        }
    }
    //默默丢弃策略
    public static class MyDiscardPolicy implements MyRejectedExecutionHandle{
 
        public MyDiscardPolicy() {
        }
        @Override
        public void rejectedExecution(Runnable runnable, MyThreadPoolExecutor threadPoolExecutor) {
 
        }
    }
    //丢弃掉最老的任务策略
    public static class MyDiscardOldestPolicy implements MyRejectedExecutionHandle{
        public MyDiscardOldestPolicy() {
        }
        @Override
        public void rejectedExecution(Runnable runnable, MyThreadPoolExecutor threadPoolExecutor) {
            if(!threadPoolExecutor.isShutdown()){ //如果线程池没被关闭
                threadPoolExecutor.getWaitingQueue().poll();//丢掉最老的任务,此时就有位置当新任务了
                threadPoolExecutor.execute(runnable); //把新任务加入到队列中
            }
        }
    }
    //由调用者调用策略
    public static class MyCallerRunsPolicy implements MyRejectedExecutionHandle{
        public MyCallerRunsPolicy(){
 
        }
        @Override
        public void rejectedExecution(Runnable runnable, MyThreadPoolExecutor threadPoolExecutor) {
            if(!threadPoolExecutor.isShutdown()){//判断线程池是否被关闭
                runnable.run();
            }
        }
    }
    //call拒绝方法
    protected final void reject(Runnable runnable){
        this.handle.rejectedExecution(runnable, this);
    }
 
    protected final void reject(Runnable runnable,MyThreadPoolExecutor threadPoolExecutor){
        this.handle.rejectedExecution(runnable, threadPoolExecutor);
    }
 
    /**
     * MyWorker就是我们每一个线程对象
     */
    private final class MyWorker implements Runnable{
 
        final Thread thread; //为每个MyWorker
 
        MyWorker(){
            Thread td = threadFactory.newThread(this);
            td.setName(THREADPOOL_NAME+threadNumber.getAndIncrement());
            this.thread=td;
            this.thread.start();
            workers.add(this);
        }
 
        //执行任务
        @Override
        public void run() {
            //循环接收任务
                while (true)
                {
                    //循环退出条件:
                    //1:当isRunning为false并且waitingQueue的队列大小为0(也就是无任务了),会优雅的退出。
                    //2:当STOPNOW为true,则说明调用了shutdownNow方法进行暴力退出。
                    if((!isRunning&&waitingQueue.size()==0)||STOPNOW)
                    {
                        break;
                    }else {
                        //不断取任务,当任务!=null时则调用run方法处理任务
                        Runnable runnable = waitingQueue.poll();
                        if(runnable!=null){
                            runnable.run();
                            System.out.println("task==>"+taskcount.incrementAndGet());
                        }
                    }
                }
        }
    }
 
    //往线程池中放任务
    @Override
    public boolean execute(Runnable runnable)
    {
        if (!this.waitingQueue.offer(runnable)) {
            this.reject(runnable);
            return false;
        }
        else {
            if(this.workers!=null&&this.workers.size()<corePoolSize){//这种情况才能添加线程
                MyWorker worker = new MyWorker(); //通过构造方法添加线程
            }
            return true;
        }
    }
    //优雅的关闭
    @Override
    public void shutdown()
    {
        this.isRunning=false;
    }
    //暴力关闭
    @Override
    public void shutdownNow()
    {
        this.STOPNOW=true;
    }
 
    //判断线程池是否关闭
    @Override
    public boolean isShutdown() {
        return !this.isRunning||STOPNOW;
    }
 
    //获取等待队列
    @Override
    public BlockingQueue<Runnable> getWaitingQueue() {
        return this.waitingQueue;
    }
}

线程池测试类

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package com.springframework.test;
 
import com.springframework.concurrent.MyThreadPoolExecutor;
 
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
 
public class ThreadPoolTest {
 
  public static void main(String[] args) {
 
 
//      MyThreadPoolExecutor myThreadPoolExecutor = new MyThreadPoolExecutor
//              (5,new ArrayBlockingQueue<>(6), Executors.defaultThreadFactory(),new MyThreadPoolExecutor.MyAbortPolicy());
 
//      MyThreadPoolExecutor myThreadPoolExecutor = new MyThreadPoolExecutor
//              (5,new ArrayBlockingQueue<>(6), Executors.defaultThreadFactory(),new MyThreadPoolExecutor.MyDiscardPolicy());
 
//      MyThreadPoolExecutor myThreadPoolExecutor = new MyThreadPoolExecutor
//              (5,new ArrayBlockingQueue<>(6), Executors.defaultThreadFactory(),new MyThreadPoolExecutor.MyDiscardOldestPolicy());
 
      MyThreadPoolExecutor myThreadPoolExecutor = new MyThreadPoolExecutor
              (5,new ArrayBlockingQueue<>(6), Executors.defaultThreadFactory(),new MyThreadPoolExecutor.MyCallerRunsPolicy());
 
 
      for(int i=0;i<11;i++){
 
          int finalI = i;
          myThreadPoolExecutor.execute(()->{
              System.out.println(Thread.currentThread().getName()+">>>>"+ finalI);
          });
 
      }
 
      myThreadPoolExecutor.shutdown();
 
//      myThreadPoolExecutor.shutdownNow();
 
 
 
 
  }
}

好了第二代线程池就优化到这了,后面可能还会出第三代,不断进行优化。

到此这篇关于学生视角手把手带你写Java 线程池改良版的文章就介绍到这了,更多相关Java 线程池内容请搜索服务器之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持服务器之家!

原文链接:https://blog.csdn.net/weixin_50071998/article/details/123595379

延伸 · 阅读

精彩推荐