WhenCondition
对应 EL 规则中的 WHEN
关键字,会被包装为
WhenCondition
组件
主要的并发编排逻辑主要由 WhenCondition
的实现方法执行
属性
WHEN
关键字支持的属性:
ignoreError
:调用链调用失败时是否继续往下执行group
:并发分组;该属性已经弃用,因为可以使用不同的WHEN
进行分组控制,例如THEN(WHEN(a,b),WHEN(c,d))
any
:任意节点执行成功就继续向下执行threadExecutorClass
:线程池名称;实例化后的线程池会被放在ExecutorHelper
的Map<String, ExecutorService> executorServiceMap
属性
1 | public class WhenCondition extends Condition { |
任务编排
WhenCondition
实现的 Condition
抽象方法
executeCondition
核心操作就是将任务包装为 CompletableFuture
:
- 过滤前后置组件
- 过滤
isAccess
false;一是过滤掉不需要执行的组件,二是不执行的组件一定是执行完成最快的,会造成any
属性结果的混乱 - 对
Condition
下的Map<String, List<Executable>> executableGroup
包装为CompletableFuture
,编排异步任务 - 在包装为
CompletableFuture
过程中使用了CompletableFutureTimeout
工具,套入CompletableFutureTimeout
方法进行超时判断,如果超时则用WhenFutureObj.timeOut
返回超时的对象 - 最终将所有需要执行的任务放进集合
List<CompletableFuture<WhenFutureObj>>
根据 any 属性对任务集合再次进行编排封装
如果是 any
属性,则使用
CompletableFuture.anyOf
进行编排,否则使用
CompletableFuture.allOf
1 | if (this.isAny()) { |
最终等待任务执行完成
1 | try { |
超时实现
Java 8 的 CompletableFuture
并没有 timeout
机制,虽然可以在 get 的时候指定 timeout,但是是一个同步堵塞的操作
一般的实现方案是启动一个 ScheduledThreadpoolExecutor
线程在 timeout * 时间后直接调用
CompletableFuture.completeExceptionally(new TimeoutException())
* 然后用 acceptEither()
或者 applyToEither
看是执行完成还是超时
Java 9 引入了 orTimeout
和
completeOnTimeOut
两个方法支持 异步 timeout
机制,底层也是使用 ScheduledThreadpoolExecutor
进行实现的
这里 LiteFlow 将任务和执行和超时封装成了一个 API 工具
CompletableFutureTimeout
在上面任务包装过程中,调用了
CompletableFutureTimeout.completeOnTimeout
方法
1 | public static <T> CompletableFuture<T> completeOnTimeout(T t, CompletableFuture<T> future, long timeout, |
- 泛型 T 下的
t
,在任务编排中是WhenFutureObj
对象 future
是通过CompletableFuture.supplyAsync(new ParallelSupplier(executable, currChainName, slotIndex), parallelExecutor)
编排的任务CompletableFuture
对象,返回结果依然是WhenFutureObj
timeout
是超时时间unit
是超时时间单位
timeoutAfter
创建出超时异常任务后,由业务任务
future
的 applyToEither
编排两个任务
超时异常任务
调用 timeoutAfter
包装超时任务
1 | public static <T> CompletableFuture<T> timeoutAfter(long timeout, TimeUnit unit) { |
delayer
是一个
ScheduledThreadPoolExecutor
创建出一个 CompletableFuture
,如果该任务在
get
时未完成,则抛出 TimeoutException
异常
WhenFutureObj
WhenFutureObj
是任务结果的包装对象
对于一个任务执行完成的状态有三种情况:
- 成功
- 失败
- 超时
在 ParallelSupplier
包装过程中对应了成功、失败两种返回
1 | public class ParallelSupplier implements Supplier<WhenFutureObj> { |
在超时任务中对应了超时的状态返回
WhenFutureObj.timeOut(executable.getId())
对象 build 方法
1 | public class WhenFutureObj { |
主要就是设置
success
、timeout
、异常信息等异步执行结果
结果解析
过滤出已经完成的任务,放到集合中
对于没有完成的任务就执行 cancel
操作
1 | List<WhenFutureObj> allCompletableWhenFutureObjList = completableFutureList.stream().filter(f -> { |
集合中的任务 get
出任务的产出,即包装的
WhenFutureObj
根据 WhenFutureObj
结果属性判断后续操作
- 输出超时信息
- 如果配置中
isIgnoreError
不忽略异常- 根据
interrupted[0]
集合中的中断情况抛出异常throw new WhenExecuteException(StrUtil.format("requestId [{}] when execute interrupted. errorResume [false].", slot.getRequestId()))
- 循环判断
CompletableFuture
的返回值,如果异步执行失败,则抛出相应的业务异常;如果是超时,这里就会抛出在超时任务编排时抛出的TimeoutException
- 根据
- 如果忽略异常;则对中断输出 warn 日志
补充
CompletableFuture cancel
在 WhenCondition
执行流程中,对于包装的
anyOf
CompletableFuture
执行完成后,存在没有结束的任务,则会调用 CompletableFuture
的 cancel
方法
1 | public boolean cancel(boolean mayInterruptIfRunning) { |
方法参数 mayInterruptIfRunning
实际并没有使用,也就是说整个 CompletableFuture
的执行并不会被中断
在官方文档中也说
@param mayInterruptIfRunning this value has no effect in this implementation because interrupts are not used to control processing.
这个值在这个实现中没有作用,因为中断不用于控制处理
该方法实现自 Future
,其定义的
mayInterruptIfRunning
并没有使用
StackOverFlow 中也有人提出了这样的问题,认为违反了
Java.util.concurrent.Future
中对 cancel
方法定义的约定
其原因在评论中有所讨论:
As mentioned in my previous (edited) comment, the
CompletableFuture
does not hold a reference to either the actual work or theThread
processing it. This is at least implied in the class documentation: "Since (unlike FutureTask) this class has no direct control over the computation that causes it to be completed, cancellation is treated as just another form of exceptional completion"由于与
FutureTask
不同,CompletableFuture
对执行任务的线程没有直接控制,因此取消被视为另一种形式的异常完成
CompletableFuture
does not hold reference to thread, which I think it is also a problem, It is not consistency withFuture
CompletableFuture
没有引用线程,我认为这也是一个问题,它与Future
不一致
参考
java - Why does CompletableFuture implement the Future interface - Stack Overflow