最近跟着网上做电商项目,在获取商品信息时,使用线程池开启异步任务,并通过CompletableFuture对异步任务进行编排。所以记录一下。
先记录一下线程池
线程池 创建线程总共有4种方式: 继承Thread类、实现Runnable接口、实现Callable接口(FutureTask)、线程池。CPU执行程序时,会不停的切换线程上下文,线程的创建和销毁会带来过度的开销,使用线程池可以提高整体性能,减少线程创建和销毁的处理时间,并且使用线程池可以很好的管理线程。
##### 创建线程池
创建线程池可以使用api中的Executors来进行创建,不过最好使用自定义线程池,即自己new一个线程池(ThreadPoolExecutor(…))。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0 ) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null ) throw new NullPointerException(); this .corePoolSize = corePoolSize; this .maximumPoolSize = maximumPoolSize; this .workQueue = workQueue; this .keepAliveTime = unit.toNanos(keepAliveTime); this .threadFactory = threadFactory; this .handler = handler; }
构造方法中的参数含义
1 2 3 4 5 6 7 corePoolSize #the number of threads to keep in the pool, even if they are idle, unless {@code allowCoreThreadTimeOut} is set maximumPoolSize: 即池中允许的最大线程数 keepAliveTime & TimeUnit: 当线程数大于核心线程数时,空闲线程在终止前等待新任务的时间,就是空闲线程的存活时间, 后面是规定的单位 BlockingQueue: 线程的阻塞队列,用来存放那些被方法提交但是还没有执行的任务 ThreadFactory: 执行器创建一个线程使用的线程工厂 RejectedExecutionHandler: 当线程池满了或者阻塞队列满了的时候,有新任务提交过来时,执行的拒绝策略
重点记录一下拒绝策略
RejectedExecutionHandler 点开源码,RejectedExecutionHandler是一个接口,看子类实现,有四种。
1 2 3 4 DiscardOldestPolicy ## 丢弃队列中最老的没有被执行的任务,然后再重新执行被拒绝的任务 AbortPolicy ## A handler for rejected tasks that throws a RejectedExecutionException //即抛出一个异常 CallerRunsPolicy ## 直接在执行方法的调用线程中执行被拒绝的任务,如果执行程序关闭了,则直接丢弃任务 DiscardPolicy ## A handler for rejected tasks that silently discards the rejected task. //默默丢弃。。。 就是直接丢弃被拒绝的任务。
当使用Executors类来创建线程池时,jdk使用的是哪个拒绝策略呢?
1 2 3 4 5 private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
线程池的工作流程 直接看图
执行流程:
1. 主线程执行execute() 方法执行一个任务,此时核心线程池中的工作线程数 < 核心线程数, 则直接创建线程并执行任务;
2. 此时有一个任务来了,结果发现核心线程池满了,就会进入阻塞队列中;
3. 如果又有新的任务提交,发现队列满了,并且线程池的最大线程数还没有满,则直接创建线程,并执行任务;
4. 如果此时队列和线程池都满了,则执行拒绝策略。
CompletableFuture 该类可以实现异步任务,对线程任务进行异步编排。例如,在网页上点击某一个商品,就会跳转到商品的详情页,需要查询商品的名字、介绍、图片、品牌等等,如果没有异步任务,一个一个执行会非常的慢,因此,使用异步编排,会节约很多时间。
来记录一下该类的一些方法的使用,先创建一个自定义线程池。
1 2 3 4 5 6 7 ThreadPoolExecutor executor = new ThreadPoolExecutor(5 , 5 , 10 , TimeUnit.SECONDS, new LinkedBlockingQueue<>(5 ), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
基本方法 先看几个基本方法。
1 2 public static CompletableFuture<Void> runAsync (Runnable runnable, Executor executor) ; public static <U> CompletableFuture<U> supplyAsync (Supplier<U> supplier, Executor executor) ;
supplyAsync() 方法的返回值,如何取到异步任务的数据,可以使用get() 方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 @Test public void test01 () throws ExecutionException, InterruptedException { CompletableFuture.runAsync(() -> { System.out.println("执行异步任务..." ); }, executor); CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { int i = 10 / 2 ; System.out.println("异步任务执行后的结果是: " + i); return i; }, executor); System.out.println("获取异步任务中返回的数据: " + future.get()); }
通过thenComplete()得到执行后的结果信息,但是无法感知异常,并且无法对结果进行修改,使用exceptionally()可以用来感知异常,并返回发生异常时的默认值。
1 2 public CompletableFuture<T> whenComplete (BiConsumer<? super T, ? super Throwable> action) public CompletableFuture<T> exceptionally (Function<Throwable, ? extends T> fn)
试一下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 @Test public void test01 () throws ExecutionException, InterruptedException { CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { int i = 10 / 0 ; System.out.println("异步任务执行后的结果是: " + i); return i; }, executor).whenComplete((res,throwable) -> { System.out.println("运行结果是: " + res); System.out.println("异常结果是: " + throwable); }).exceptionally((throwable -> { return -1 ; })); System.out.println("获取异步任务中返回的数据: " + future.get()); }
可以直接使用handle方法对异常感知,并修改结果。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 @Test public void test02 () throws ExecutionException, InterruptedException { CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { int i = 10 / 0 ; System.out.println("异步任务执行后的结果是: " + i); return i; }, executor).handle( (res, throwable) -> { if (res != null ) { return res; } if (throwable != null ) { return -1 ; } return -1 ; }); System.out.println("获取异步任务中返回的数据: " + future.get()); }
串行化 ###### 一个任务
有两个线程任务,只有当一个线程执行任务完成后,才会执行第二个任务。主要有三个方法,如下
一个一个来试一试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 @Test public void test03 () { CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> { int sum = 2 + 3 ; System.out.println("第一个任务的执行结果是: " + sum); return sum; }, executor).thenRunAsync(() -> { System.out.println("第二个任务执行了..." ); }, executor); } @Test public void test04 () { CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> { int sum = 2 + 3 ; System.out.println("第一个任务的执行结果是: " + sum); return sum; }, executor).thenAcceptAsync((res) -> { System.out.println("第二个任务执行了...第一个任务的结果是:" + res); }, executor); } @Test public void test03 () throws ExecutionException, InterruptedException { CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { int sum = 2 + 3 ; System.out.println("第一个任务的执行结果是: " + sum); return sum; }, executor).thenApplyAsync((res) -> { System.out.println("第二个任务执行了...第一个任务的结果是:" + res); int i = 10 ; return i; }, executor); System.out.println("得到第二个任务的结果: " + future.get()); }
两个任务 第三个任务的完成需要前面两个任务都完成或者只完成一个。
1. 两个任务都完成
和一个任务类似,有三种形式的方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public CompletableFuture<Void> runAfterBothAsync (CompletionStage<?> other, //另外一个任务 Runnable action, //第三个任务 Executor executor) public <U> CompletableFuture<Void> thenAcceptBothAsync ( CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action, Executor executor) public <U,V> CompletableFuture<V> thenCombineAsync ( CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)
例子如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> { int a = 10 / 2 ; System.out.println("第一个任务的结果是: " + a); return a; }, executor); CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(() -> { int b = 8 / 4 ; System.out.println("第二个任务的结果是: " + b); return b; }, executor); public void test () { f1.runAfterBothAsync(f2, () -> { System.out.println("第三个任务执行了..." ); }, executor); } public void test () { f1.thenAcceptBothAsync(f2, (res1,res2) -> { System.out.println("第三个任务拿到了数据,第一个任务的数据=>>" + res1 + ",第二个任务的数据=>>" + res2); },executor); } public void test () throw Exception { CompletableFuture<Integer> future = f1.thenCombineAsync(f2, (res1, res2) -> { System.out.println("第三个任务拿到了数据,第一个任务的数据=>>" + res1 + ",第二个任务的数据=>>" + res2); int myRes = res1 + res2; return myRes; }, executor); System.out.println("第三个任务的结果是: " + future.get()); }
2. 一个任务完成
只要前两个任务有一个任务完成,就会执行第三个任务,同样有三种方法,记录一下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public CompletableFuture<Void> runAfterEitherAsync (CompletionStage<?> other, Runnable action, Executor executor) public CompletableFuture<Void> acceptEitherAsync ( CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor) public <U> CompletableFuture<U> applyToEitherAsync ( CompletionStage<? extends T> other, Function<? super T, U> fn, Executor executor)
例子如下,还是用前面的两个任务,给其中一个线程加睡眠时间,模拟一个线程成功
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> { int a = 10 / 2 ; try { Thread.sleep(3000 ); } catch (Exception e) { e.printStackTrace(); } System.out.println("第一个任务的结果是: " + a); return a; }, executor); CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(() -> { int b = 8 / 4 ; System.out.println("第二个任务的结果是: " + b); return b; }, executor); public void test () { f1.runAfterEitherAsync(f2, () -> { System.out.println("第三个任务执行了。。。" ); }, executor); } public void test () { f1.acceptEitherAsync(f2, (res) -> { System.out.println("第三个任务执行了。。。得到数据: " + res); }, executor); } public void test () throw Exception { CompletableFuture<Integer> future = f1.applyToEitherAsync(f2, res -> { System.out.println("第三个任务执行了。。。得到的数据是: " + res); int i = 12 ; return i; }, executor); System.out.println("第三个任务的数据是: " + future.get()); }
多任务组合 1. allOf
多个任务执行,只有这些任务都完成,才算执行完毕,调用allOf方法会返回一个CompletableFuture对象,通过get() 方法,进行阻塞,只有当所有任务完成之后,程序才会往下走。
1 public static CompletableFuture<Void> allOf (CompletableFuture<?>... cfs)
假设现在有多个任务执行,查询手机品牌、查询手机颜色、查询手机参数。让其中一个任务睡眠3秒钟
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> { System.out.println("查询手机品牌.." ); return "huawei" ; }, executor); CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(3000 ); } catch (Exception e) { e.printStackTrace(); } System.out.println("查询手机颜色.." ); return "淡雅紫" ; }, executor); CompletableFuture<String> f3 = CompletableFuture.supplyAsync(() -> { System.out.println("查询手机参数.." ); return "8GB+128GB" ; }, executor); public void test () throw Exception { CompletableFuture<Void> allOf = CompletableFuture.allOf(f1, f2, f3); allOf.get(); System.out.println("end......" ); }
2. anyOf
多个任务执行,只要有一个完成,就会执行完毕,调用anyOf方法会返回一个CompletableFuture对象,通过get() 方法,进行阻塞,只要当有一个任务完成之后,程序就会往下走。
1 public static CompletableFuture<Object> anyOf (CompletableFuture<?>... cfs)
和上述一样,test
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> { System.out.println("查询手机品牌.." ); return "huawei" ; }, executor); CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(3000 ); } catch (Exception e) { e.printStackTrace(); } System.out.println("查询手机颜色.." ); return "淡雅紫" ; }, executor); CompletableFuture<String> f3 = CompletableFuture.supplyAsync(() -> { System.out.println("查询手机参数.." ); return "8GB+128GB" ; }, executor); public void test () throw Exception { CompletableFuture<Void> anyOf = CompletableFuture.anyOf(f1, f2, f3); anyOf.get(); System.out.println("end......" ); }
根据实际场景选择对应的方法,有些任务,需要前面任务完成才能做,这时候需要使用串行化。有些任务,只需要将所有任务完成,任务之间没有关系,可以使用allOf最后对所有任务进行提交。