LiteFlow - WhenCondition 和异步超时机制

WhenCondition

对应 EL 规则中的 WHEN 关键字,会被包装为 WhenCondition 组件

主要的并发编排逻辑主要由 WhenCondition 的实现方法执行

属性

WHEN 关键字支持的属性:

  • ignoreError:调用链调用失败时是否继续往下执行
  • group:并发分组;该属性已经弃用,因为可以使用不同的 WHEN 进行分组控制,例如 THEN(WHEN(a,b),WHEN(c,d))
  • any:任意节点执行成功就继续向下执行
  • threadExecutorClass:线程池名称;实例化后的线程池会被放在 ExecutorHelperMap<String, ExecutorService> executorServiceMap 属性
1
2
3
4
5
6
7
8
9
10
11
public class WhenCondition extends Condition {

private final Logger LOG = LoggerFactory.getLogger(this.getClass());

private boolean ignoreError = false;

private String group = LocalDefaultFlowConstant.DEFAULT;

private boolean any = false;

private String threadExecutorClass;

任务编排

WhenCondition 实现的 Condition 抽象方法 executeCondition

核心操作就是将任务包装为 CompletableFuture

  1. 过滤前后置组件
  2. 过滤 isAccess false;一是过滤掉不需要执行的组件,二是不执行的组件一定是执行完成最快的,会造成 any 属性结果的混乱
  3. Condition 下的 Map<String, List<Executable>> executableGroup 包装为 CompletableFuture,编排异步任务
  4. 在包装为 CompletableFuture 过程中使用了 CompletableFutureTimeout 工具,套入 CompletableFutureTimeout 方法进行超时判断,如果超时则用 WhenFutureObj.timeOut 返回超时的对象
  5. 最终将所有需要执行的任务放进集合 List<CompletableFuture<WhenFutureObj>>

根据 any 属性对任务集合再次进行编排封装

如果是 any 属性,则使用 CompletableFuture.anyOf 进行编排,否则使用 CompletableFuture.allOf

1
2
3
4
5
6
7
8
9
10
if (this.isAny()) {
// 把这些 CompletableFuture 通过 anyOf 合成一个 CompletableFuture
resultCompletableFuture = CompletableFuture
.anyOf(completableFutureList.toArray(new CompletableFuture[] {}));
}
else {
// 把这些 CompletableFuture 通过 allOf 合成一个 CompletableFuture
resultCompletableFuture = CompletableFuture
.allOf(completableFutureList.toArray(new CompletableFuture[] {}));
}

最终等待任务执行完成

1
2
3
4
try {
// 进行执行,这句执行完后,就意味着所有的任务要么执行完毕,要么超时返回
resultCompletableFuture.get();
}

超时实现

Java 8 的 CompletableFuture 并没有 timeout 机制,虽然可以在 get 的时候指定 timeout,但是是一个同步堵塞的操作

一般的实现方案是启动一个 ScheduledThreadpoolExecutor 线程在 timeout * 时间后直接调用 CompletableFuture.completeExceptionally(new TimeoutException()) * 然后用 acceptEither() 或者 applyToEither 看是执行完成还是超时

Java 9 引入了 orTimeoutcompleteOnTimeOut 两个方法支持 异步 timeout 机制,底层也是使用 ScheduledThreadpoolExecutor 进行实现的

这里 LiteFlow 将任务和执行和超时封装成了一个 API 工具 CompletableFutureTimeout

在上面任务包装过程中,调用了 CompletableFutureTimeout.completeOnTimeout 方法

1
2
3
4
5
public static <T> CompletableFuture<T> completeOnTimeout(T t, CompletableFuture<T> future, long timeout,
TimeUnit unit) {
final CompletableFuture<T> timeoutFuture = timeoutAfter(timeout, unit);
return future.applyToEither(timeoutFuture, Function.identity()).exceptionally((throwable) -> t);
}
  • 泛型 T 下的 t,在任务编排中是 WhenFutureObj 对象
  • future 是通过 CompletableFuture.supplyAsync(new ParallelSupplier(executable, currChainName, slotIndex), parallelExecutor) 编排的任务 CompletableFuture 对象,返回结果依然是 WhenFutureObj
  • timeout 是超时时间
  • unit 是超时时间单位

timeoutAfter 创建出超时异常任务后,由业务任务 futureapplyToEither 编排两个任务

超时异常任务

调用 timeoutAfter 包装超时任务

1
2
3
4
5
6
7
public static <T> CompletableFuture<T> timeoutAfter(long timeout, TimeUnit unit) {
CompletableFuture<T> result = new CompletableFuture<T>();
// timeout 时间后 抛出TimeoutException 类似于sentinel / watcher
Delayer.delayer.schedule(() -> result.completeExceptionally(new TimeoutException()),
timeout, unit);
return result;
}

delayer 是一个 ScheduledThreadPoolExecutor

创建出一个 CompletableFuture,如果该任务在 get 时未完成,则抛出 TimeoutException 异常

WhenFutureObj

WhenFutureObj 是任务结果的包装对象

对于一个任务执行完成的状态有三种情况:

  • 成功
  • 失败
  • 超时

ParallelSupplier 包装过程中对应了成功、失败两种返回

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class ParallelSupplier implements Supplier<WhenFutureObj> {

private static final Logger LOG = LoggerFactory.getLogger(ParallelSupplier.class);

private final Executable executableItem;

private final String currChainId;

private final Integer slotIndex;

public ParallelSupplier(Executable executableItem, String currChainId, Integer slotIndex) {
this.executableItem = executableItem;
this.currChainId = currChainId;
this.slotIndex = slotIndex;
}

@Override
public WhenFutureObj get() {
try {
executableItem.setCurrChainId(currChainId);
executableItem.execute(slotIndex);
// 成功
return WhenFutureObj.success(executableItem.getId());
}
catch (Exception e) {
// 失败
return WhenFutureObj.fail(executableItem.getId(), e);
}
}
}

在超时任务中对应了超时的状态返回 WhenFutureObj.timeOut(executable.getId())

对象 build 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class WhenFutureObj {

private boolean success;

private boolean timeout;

private String executorName;

private Exception ex;

public static WhenFutureObj success(String executorName) {
WhenFutureObj result = new WhenFutureObj();
result.setSuccess(true);
result.setTimeout(false);
result.setExecutorName(executorName);
return result;
}

public static WhenFutureObj fail(String executorName, Exception ex) {
WhenFutureObj result = new WhenFutureObj();
result.setSuccess(false);
result.setTimeout(false);
result.setExecutorName(executorName);
result.setEx(ex);
return result;
}

public static WhenFutureObj timeOut(String executorName) {
WhenFutureObj result = new WhenFutureObj();
result.setSuccess(false);
result.setTimeout(true);
result.setExecutorName(executorName);
result.setEx(new WhenTimeoutException(
StrUtil.format("Timed out when executing the component[{}],when-max-timeout-seconds config is:{}(s)",
executorName, LiteflowConfigGetter.get().getWhenMaxWaitSeconds())));
return result;
}
...
}

主要就是设置 successtimeout、异常信息等异步执行结果

结果解析

过滤出已经完成的任务,放到集合中

对于没有完成的任务就执行 cancel 操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
List<WhenFutureObj> allCompletableWhenFutureObjList = completableFutureList.stream().filter(f -> {
// 过滤出已经完成的,没完成的就直接终止
if (f.isDone()) {
return true;
}
else {
f.cancel(true);
return false;
}
}).map(f -> {
try {
return f.get();
}
catch (InterruptedException | ExecutionException e) {
interrupted[0] = true;
return null;
}
}).collect(Collectors.toList());

集合中的任务 get 出任务的产出,即包装的 WhenFutureObj

根据 WhenFutureObj 结果属性判断后续操作

  • 输出超时信息
  • 如果配置中 isIgnoreError 不忽略异常
    • 根据 interrupted[0] 集合中的中断情况抛出异常 throw new WhenExecuteException(StrUtil.format("requestId [{}] when execute interrupted. errorResume [false].", slot.getRequestId()))
    • 循环判断 CompletableFuture 的返回值,如果异步执行失败,则抛出相应的业务异常;如果是超时,这里就会抛出在超时任务编排时抛出的 TimeoutException
  • 如果忽略异常;则对中断输出 warn 日志

补充

CompletableFuture cancel

WhenCondition 执行流程中,对于包装的 anyOf CompletableFuture 执行完成后,存在没有结束的任务,则会调用 CompletableFuturecancel 方法

1
2
3
4
5
6
public boolean cancel(boolean mayInterruptIfRunning) {
boolean cancelled = (result == null) &&
internalComplete(new AltResult(new CancellationException()));
postComplete();
return cancelled || isCancelled();
}

方法参数 mayInterruptIfRunning 实际并没有使用,也就是说整个 CompletableFuture 的执行并不会被中断

在官方文档中也说

@param mayInterruptIfRunning this value has no effect in this implementation because interrupts are not used to control processing.

这个值在这个实现中没有作用,因为中断不用于控制处理

该方法实现自 Future,其定义的 mayInterruptIfRunning 并没有使用

StackOverFlow 中也有人提出了这样的问题,认为违反了 Java.util.concurrent.Future 中对 cancel 方法定义的约定

其原因在评论中有所讨论:

  • As mentioned in my previous (edited) comment, the CompletableFuture does not hold a reference to either the actual work or the Thread processing it. This is at least implied in the class documentation: "Since (unlike FutureTask) this class has no direct control over the computation that causes it to be completed, cancellation is treated as just another form of exceptional completion"

    由于与 FutureTask 不同,CompletableFuture 对执行任务的线程没有直接控制,因此取消被视为另一种形式的异常完成

  • CompletableFuture does not hold reference to thread, which I think it is also a problem, It is not consistency with Future

    CompletableFuture 没有引用线程,我认为这也是一个问题,它与 Future 不一致

参考

java - Why does CompletableFuture implement the Future interface - Stack Overflow