0%

线程池

​ 最近跟着网上做电商项目,在获取商品信息时,使用线程池开启异步任务,并通过CompletableFuture对异步任务进行编排。所以记录一下。

​ 先记录一下线程池


线程池

​ 创建线程总共有4种方式: 继承Thread类、实现Runnable接口、实现Callable接口(FutureTask)、线程池。CPU执行程序时,会不停的切换线程上下文,线程的创建和销毁会带来过度的开销,使用线程池可以提高整体性能,减少线程创建和销毁的处理时间,并且使用线程池可以很好的管理线程。

##### 创建线程池

​ 创建线程池可以使用api中的Executors来进行创建,不过最好使用自定义线程池,即自己new一个线程池(ThreadPoolExecutor(…))。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

​ 构造方法中的参数含义

1
2
3
4
5
6
7
corePoolSize #the number of threads to keep in the pool, even if they are idle, unless {@code allowCoreThreadTimeOut} is set //保留在池中的线程数量,即使它们空闲,除非设置了容许核心线程超时。核心线程数,最少的线程数
maximumPoolSize: 即池中允许的最大线程数
keepAliveTime & TimeUnit: 当线程数大于核心线程数时,空闲线程在终止前等待新任务的时间,就是空闲线程的存活时间, 后面是规定的单位
BlockingQueue: 线程的阻塞队列,用来存放那些被方法提交但是还没有执行的任务
ThreadFactory: 执行器创建一个线程使用的线程工厂
RejectedExecutionHandler: 当线程池满了或者阻塞队列满了的时候,有新任务提交过来时,执行的拒绝策略

​ 重点记录一下拒绝策略

RejectedExecutionHandler

​ 点开源码,RejectedExecutionHandler是一个接口,看子类实现,有四种。

threadpool

1
2
3
4
DiscardOldestPolicy ## 丢弃队列中最老的没有被执行的任务,然后再重新执行被拒绝的任务
AbortPolicy ## A handler for rejected tasks that throws a RejectedExecutionException //即抛出一个异常
CallerRunsPolicy ## 直接在执行方法的调用线程中执行被拒绝的任务,如果执行程序关闭了,则直接丢弃任务
DiscardPolicy ## A handler for rejected tasks that silently discards the rejected task. //默默丢弃。。。 就是直接丢弃被拒绝的任务。

​ 当使用Executors类来创建线程池时,jdk使用的是哪个拒绝策略呢?

1
2
3
4
5
/**
* The default rejected execution handler
*/
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy(); //抛出异常
线程池的工作流程

​ 直接看图

work

​ 执行流程:

     1. 主线程执行execute() 方法执行一个任务,此时核心线程池中的工作线程数 < 核心线程数, 则直接创建线程并执行任务;
     2. 此时有一个任务来了,结果发现核心线程池满了,就会进入阻塞队列中;
     3. 如果又有新的任务提交,发现队列满了,并且线程池的最大线程数还没有满,则直接创建线程,并执行任务;
     4. 如果此时队列和线程池都满了,则执行拒绝策略。

CompletableFuture

​ 该类可以实现异步任务,对线程任务进行异步编排。例如,在网页上点击某一个商品,就会跳转到商品的详情页,需要查询商品的名字、介绍、图片、品牌等等,如果没有异步任务,一个一个执行会非常的慢,因此,使用异步编排,会节约很多时间。

​ 来记录一下该类的一些方法的使用,先创建一个自定义线程池。

1
2
3
4
5
6
7
ThreadPoolExecutor executor = new ThreadPoolExecutor(5,
5,
10,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(5),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
基本方法

​ 先看几个基本方法。

1
2
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor); //异步执行线程,并且无法接收任务执行完后的返回值
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor); //可以接收任务执行完后的返回值

supplyAsync() 方法的返回值,如何取到异步任务的数据,可以使用get() 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
   @Test
public void test01() throws ExecutionException, InterruptedException {
CompletableFuture.runAsync(() -> {
System.out.println("执行异步任务...");
}, executor);

CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
int i = 10 / 2;
System.out.println("异步任务执行后的结果是: " + i);
return i;
}, executor);

System.out.println("获取异步任务中返回的数据: " + future.get());
}

/****输出结果****/
/***************************/
/* 执行异步任务... */
/* 异步任务执行后的结果是: 5 */
/* 获取异步任务中返回的数据: 5 */
/***************************/

​ 通过thenComplete()得到执行后的结果信息,但是无法感知异常,并且无法对结果进行修改,使用exceptionally()可以用来感知异常,并返回发生异常时的默认值。

1
2
public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action) //BigConsumer有两个参数,一个是结果,一个是异常
public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn) //传入一个throwable

​ 试一下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
    @Test
public void test01() throws ExecutionException, InterruptedException {
// CompletableFuture.runAsync(() -> {
// System.out.println("执行异步任务...");
// }, executor);

CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
int i = 10 / 0; //模拟异常
System.out.println("异步任务执行后的结果是: " + i);
return i;
}, executor).whenComplete((res,throwable) -> {
System.out.println("运行结果是: " + res);
System.out.println("异常结果是: " + throwable);
}).exceptionally((throwable -> {
return -1;
}));

System.out.println("获取异步任务中返回的数据: " + future.get());
}

/****输出结果****/
/***************************/
/* 运行结果是: null */
/* 异常结果是: java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero */
/* 获取异步任务中返回的数据: -1 */
/***************************/

​ 可以直接使用handle方法对异常感知,并修改结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
   @Test
public void test02() throws ExecutionException, InterruptedException {

CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
int i = 10 / 0;
System.out.println("异步任务执行后的结果是: " + i);
return i;
}, executor).handle( (res, throwable) -> {
if (res != null) {
return res;
}
if (throwable != null) {
return -1;
}
return -1;
});

System.out.println("获取异步任务中返回的数据: " + future.get());
}

/****输出结果****/
/***************************/
/* 获取异步任务中返回的数据: -1 */
/***************************/
串行化
###### 一个任务

​ 有两个线程任务,只有当一个线程执行任务完成后,才会执行第二个任务。主要有三个方法,如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* 线程的串行化
* 当执行完一个线程任务后,才能执行后面的线程
*
* 后面没有加Async表示后前面的任务共用一个线程(后面的都是一样的)
*
* thenRun: 不用获取上一步的执行结果
* public CompletableFuture<Void> thenRun(Runnable action)
* public CompletableFuture<Void> thenRunAsync(Runnable action)
* public CompletableFuture<Void> thenRunAsync(Runnable action,Executor executor)
*
* thenAccept: 可以获取前一个任务执行的结果
* thenApply: 可以获取前任务的执行结果,也可以返回当前任务的执行结果
*
*/

​ 一个一个来试一试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
  //public CompletableFuture<Void> thenRunAsync(Runnable action,Executor executor) ## Runnable接收不了参数也没有返回值
@Test
public void test03() {
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
int sum = 2 + 3;
System.out.println("第一个任务的执行结果是: " + sum);
return sum;
}, executor).thenRunAsync(() -> {
System.out.println("第二个任务执行了...");
}, executor);

// 输出结果
// 第一个任务的执行结果是: 5
// 第二个任务执行了...
}


// public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor)
// Consumer<? super T> 可以传入一个参数,并且这个参数是上一个结果的父类,并且无返回值: void accept(T t)
@Test
public void test04() {
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
int sum = 2 + 3;
System.out.println("第一个任务的执行结果是: " + sum);
return sum;
}, executor).thenAcceptAsync((res) -> {
System.out.println("第二个任务执行了...第一个任务的结果是:" + res);
}, executor);

// 输出结果
//第一个任务的执行结果是: 5
// 第二个任务执行了...第一个任务的结果是:5
}


// public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
// 传入的Function类,有参数,也有返回值。
@Test
public void test03() throws ExecutionException, InterruptedException {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
int sum = 2 + 3;
System.out.println("第一个任务的执行结果是: " + sum);
return sum;
}, executor).thenApplyAsync((res) -> {
System.out.println("第二个任务执行了...第一个任务的结果是:" + res);
int i = 10;
return i;
}, executor);

System.out.println("得到第二个任务的结果: " + future.get());


// 输出结果
// 第一个任务的执行结果是: 5
// 第二个任务执行了...第一个任务的结果是:5
// 得到第二个任务的结果: 10
}
两个任务

​ 第三个任务的完成需要前面两个任务都完成或者只完成一个。

1. 两个任务都完成

​ 和一个任务类似,有三种形式的方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 无法接收前两个任务的结果,并且第三个任务也无法返回结果    
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, //另外一个任务
Runnable action, //第三个任务
Executor executor) //线程池

//可以接收前两个任务的结果,但是无法返回结果
public <U> CompletableFuture<Void> thenAcceptBothAsync(
CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action, Executor executor)

//可以接收前两个任务的结果,并且返回自己的结果
public <U,V> CompletableFuture<V> thenCombineAsync(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn, Executor executor)

例子如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
    //现在有两个任务
CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> {
int a = 10 / 2;
System.out.println("第一个任务的结果是: " + a);
return a;
}, executor);
CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(() -> {
int b = 8 / 4;
System.out.println("第二个任务的结果是: " + b);
return b;
}, executor);

//================================================================//

//runAfterBothAsync()
public void test() {
f1.runAfterBothAsync(f2, () -> {
System.out.println("第三个任务执行了..."); //print "第三个任务执行了..."
}, executor);
}

//thenAcceptBothAsync()
public void test() {
f1.thenAcceptBothAsync(f2, (res1,res2) -> {
System.out.println("第三个任务拿到了数据,第一个任务的数据=>>" + res1 + ",第二个任务的数据=>>" + res2);
//print 第三个任务拿到了数据,第一个任务的数据=>>5,第二个任务的数据=>>2
},executor);
}

//thenCombineAsync()
public void test() throw Exception {
CompletableFuture<Integer> future = f1.thenCombineAsync(f2, (res1, res2) -> {
System.out.println("第三个任务拿到了数据,第一个任务的数据=>>" + res1 + ",第二个任务的数据=>>" + res2);
int myRes = res1 + res2;
return myRes;
}, executor);

System.out.println("第三个任务的结果是: " + future.get());
}


// 输出结果
// 第三个任务拿到了数据,第一个任务的数据=>>5,第二个任务的数据=>>2
// 第三个任务的结果是: 7

2. 一个任务完成

​ 只要前两个任务有一个任务完成,就会执行第三个任务,同样有三种方法,记录一下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 无法接收前两个任务中某一个的结果,并且第三个任务也无法返回结果    
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,
Runnable action,
Executor executor)

//可以接收前两个任务中某一个的结果,但是无法返回结果
public CompletableFuture<Void> acceptEitherAsync(
CompletionStage<? extends T> other, Consumer<? super T> action,
Executor executor)

//可以接收前两个任务中某一个的结果,并且返回自己的结果
public <U> CompletableFuture<U> applyToEitherAsync(
CompletionStage<? extends T> other, Function<? super T, U> fn,
Executor executor)

例子如下,还是用前面的两个任务,给其中一个线程加睡眠时间,模拟一个线程成功

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
    //现在有两个任务
CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> {
int a = 10 / 2;
try {
Thread.sleep(3000); //睡眠3秒
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("第一个任务的结果是: " + a);
return a;
}, executor);
CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(() -> {
int b = 8 / 4;
System.out.println("第二个任务的结果是: " + b);
return b;
}, executor);

//================================================================//

//runAfterEitherAsync()
public void test() {
f1.runAfterEitherAsync(f2, () -> {
System.out.println("第三个任务执行了。。。");
}, executor);

//结果
// 第二个任务的结果是: 2
// 第三个任务执行了。。。
// 第一个任务的结果是: 5
}

//acceptEitherAsync()
public void test() {
f1.acceptEitherAsync(f2, (res) -> {
System.out.println("第三个任务执行了。。。得到数据: " + res);
}, executor);

//结果
// 第二个任务的结果是: 2
// 第三个任务执行了。。。得到数据: 2
// 第一个任务的结果是: 5

}

//applyToEitherAsync
public void test() throw Exception{
CompletableFuture<Integer> future = f1.applyToEitherAsync(f2, res -> {
System.out.println("第三个任务执行了。。。得到的数据是: " + res);
int i = 12;
return i;
}, executor);

System.out.println("第三个任务的数据是: " + future.get());

// 输出结果
// 第二个任务的结果是: 2
// 第三个任务执行了。。。得到的数据是: 2
// 第三个任务的数据是: 12
// 第一个任务的结果是: 5
}
多任务组合

1. allOf

​ 多个任务执行,只有这些任务都完成,才算执行完毕,调用allOf方法会返回一个CompletableFuture对象,通过get() 方法,进行阻塞,只有当所有任务完成之后,程序才会往下走。

1
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) //可以传入多个任务

​ 假设现在有多个任务执行,查询手机品牌、查询手机颜色、查询手机参数。让其中一个任务睡眠3秒钟

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
System.out.println("查询手机品牌..");
return "huawei";
}, executor);

CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("查询手机颜色..");
return "淡雅紫";
}, executor);

CompletableFuture<String> f3 = CompletableFuture.supplyAsync(() -> {
System.out.println("查询手机参数..");
return "8GB+128GB";
}, executor);

public void test() throw Exception{
CompletableFuture<Void> allOf = CompletableFuture.allOf(f1, f2, f3);
allOf.get();
System.out.println("end......");
}

//输出结果
//查询手机品牌..
//查询手机参数..
//查询手机颜色..
//end......

2. anyOf

​ 多个任务执行,只要有一个完成,就会执行完毕,调用anyOf方法会返回一个CompletableFuture对象,通过get() 方法,进行阻塞,只要当有一个任务完成之后,程序就会往下走。

1
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)

和上述一样,test

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
      CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
System.out.println("查询手机品牌..");
return "huawei";
}, executor);

CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("查询手机颜色..");
return "淡雅紫";
}, executor);

CompletableFuture<String> f3 = CompletableFuture.supplyAsync(() -> {
System.out.println("查询手机参数..");
return "8GB+128GB";
}, executor);

public void test() throw Exception{
CompletableFuture<Void> anyOf = CompletableFuture.anyOf(f1, f2, f3);
anyOf.get();
System.out.println("end......");
}

//输出结果
//查询手机品牌..
//end......
//查询手机参数..
//查询手机颜色..


​ 根据实际场景选择对应的方法,有些任务,需要前面任务完成才能做,这时候需要使用串行化。有些任务,只需要将所有任务完成,任务之间没有关系,可以使用allOf最后对所有任务进行提交。

-------------本文结束感谢您的阅读-------------