贫瘠之地

华北无浪漫,死海扬起帆
多少个夜晚,独自望着天

0%

LiteFlow - 组件的超时时间

背景

允许对EL中的每⼀个组件设置超时时间控制 · Issue #I7I3LL · dromara/liteFlow - Gitee.com

issue 中需求了更细粒度的超时时间设置,允许 EL 中的每⼀个组件设置超时时间控制

如何使用可以参考官方文档

⏱️超时控制 | LiteFlow

实现

EL 关键字

在 EL 解析器中注册相关的表达式和 BaseOperator 实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
 /**
* EL解析引擎
*/
public final static ExpressRunner EXPRESS_RUNNER = new ExpressRunner();

static {
// 初始化QLExpress的Runner
EXPRESS_RUNNER.addFunction(ChainConstant.THEN, new ThenOperator());
EXPRESS_RUNNER.addFunction(ChainConstant.WHEN, new WhenOperator());
...
// 这里 MAX_WAIT_SECONDS = maxWaitSeconds
EXPRESS_RUNNER.addFunctionAndClassMethod(ChainConstant.MAX_WAIT_SECONDS, Object.class, new MaxWaitSecondsOperator());
EXPRESS_RUNNER.addFunctionAndClassMethod(ChainConstant.PARALLEL, Object.class, new ParallelOperator());
}

MaxWaitSecondsOperator

MaxWaitSecondsOperator 实现自 BaseOperator

BaseOperator 为了强化 executeInner 方法,会捕获抛出的 QLException 错误,输出友好的错误提示

build 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Override
public Condition build(Object[] objects) throws Exception {
// 校验参数数量
OperatorHelper.checkObjectSizeEqTwo(objects);
// 转换参数 1 为执行对象
Executable executable = OperatorHelper.convert(objects[0], Executable.class);
// 获取传入的时间参数
Integer maxWaitSeconds = OperatorHelper.convert(objects[1], Integer.class);
if (executable instanceof WhenCondition) {
// WhenCondition,直接设置等待时间
WhenCondition whenCondition = OperatorHelper.convert(executable, WhenCondition.class);
whenCondition.setMaxWaitTime(maxWaitSeconds);
whenCondition.setMaxWaitTimeUnit(TimeUnit.SECONDS);
return whenCondition;
} else if (executable instanceof FinallyCondition) {
// FINALLY,报错
String errorMsg = StrFormatter.format("The caller [{}] cannot use the keyword \"maxWaitSeconds'\"", executable.toString());
throw new QLException(errorMsg);
} else if (containsFinally(executable)) {
// 处理 THEN 中的 FINALLY
ThenCondition thenCondition = OperatorHelper.convert(executable, ThenCondition.class);
return handleFinally(thenCondition, maxWaitSeconds);
} else {
// 其他情况,被 WHEN 包装
return wrappedByTimeout(executable, maxWaitSeconds);
}
}

流程可以概括为:

  • 校验 & 准备工作
    • 校验参数数量;参数数量需要为 2(执行对象、超时时间)
    • 转换第一个参数为执行对象;调用 OperatorHelper.convert,逻辑中很多对象都用 Object 引用承接,在转换方法中进行校验;个人理解是牺牲了一些可读性换取了开发的灵活性
    • 获取超时时间参数
  • 包装 Condition
    • WhenCondition 直接设置等待时间
    • FinallyCondition 不允许设置超时时间
    • ThenCondition 中如果包含 FinallyCondition,会在 handleFinally 方法中包装成一个 WhenCondition,外层套一个 ThenCondition,将 FinallyCondition 排除出去,时间设置在里层的 WhenCondition
    • 其他则被包装成 TimeoutConditionTimeoutConditionWhenCondition 的实现,区别在于将执行对象抛出的 WhenTimeoutException 转换为 TimeoutException

WhenCondition

WhenCondition 在早期版本就支持超时设置

1
2
3
4
5
# 异步线程最⻓的等待时间(只⽤于when),默认值为15000
liteflow.when-max-wait-time=15000

# 异步线程最⻓的等待时间单位(只⽤于when),默认值为MILLISECONDS,毫秒
liteflow.when-max-wait-time-unit=MILLISECONDS

所以对于 WhenCondition 的处理只需要设置合适的属性值即可

1
2
3
4
5
6
7
if (executable instanceof WhenCondition) {
// WhenCondition,直接设置等待时间
WhenCondition whenCondition = OperatorHelper.convert(executable, WhenCondition.class);
whenCondition.setMaxWaitTime(maxWaitSeconds);
whenCondition.setMaxWaitTimeUnit(TimeUnit.SECONDS);
return whenCondition;
}

WhenCondition 执行时,会启动一个 ScheduledThreadPoolExecutor 来实现超时效果

具体流程可以阅读 LiteFlow - WhenCondition 和异步超时机制 | 贫瘠之地 (kuga.fun)

FinallyCondition

FinallyCondition 不允许设置超时时间,会抛出异常

1
2
3
4
5
else if (executable instanceof FinallyCondition) {
// FINALLY,报错
String errorMsg = StrFormatter.format("The caller [{}] cannot use the keyword \"maxWaitSeconds'\"", executable.toString());
throw new QLException(errorMsg);
}

为什么 FinallyCondition 不能设置超时时间呢

在 issue 里可以看到这样的表达

如果不使用 maxWaitSeconds 则表示不使用超时控制。 此外,FINALLY 不能使用 maxWaitSeconds,其一定会被执行

这样的实现应该是为了避免作为兜底的后置处理,因为超时反而无法正常执行了

ThenCondition

这个 case 的入口是一个判断方法 containsFinally

1
2
3
4
5
6
7
8
9
10
/**
* 判断 THEN 中是否含有 FINALLY 组件
*
* @param executable 判断对象
* @return 含有 FINALLY 组件返回 true,否则返回 false
*/
private boolean containsFinally(Executable executable) {
return executable instanceof ThenCondition
&& CollUtil.isNotEmpty(((ThenCondition) executable).getFinallyConditionList());
}

这里其实判断了两部分内容:

  • executable 对象是一个 ThenCondition
  • 这个 ThenCondition 后置处理器不为空


满足条件后就会进入

1
2
3
4
5
else if (containsFinally(executable)) {
// 处理 THEN 中的 FINALLY
ThenCondition thenCondition = OperatorHelper.convert(executable, ThenCondition.class);
return handleFinally(thenCondition, maxWaitSeconds);
}

在这里首先提取了 ThenCondition 对象

然后调用 handleFinally 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* 将 FINALLY 排除在超时控制之外
*
* @param thenCondition 待处理的 ThenCondition
* @param maxWaitSeconds 最大等待秒数
* @return 处理后的 ThenCondition
*/
private ThenCondition handleFinally(ThenCondition thenCondition, Integer maxWaitSeconds) {
// 进行如下转换
// THEN(PRE(a),b,FINALLY(c))
// => THEN(
// WHEN(THEN(PRE(a),b)),
// FINALLY(c))

// 定义外层 THEN
ThenCondition outerThenCondition = new ThenCondition();

// 把 FINALLY 转移到外层 THEN
List<Executable> finallyList = thenCondition.getExecutableList(ConditionKey.FINALLY_KEY);
finallyList.forEach(executable
-> outerThenCondition
.addFinallyCondition((FinallyCondition) executable));
finallyList.clear();

// 包装内部 THEN
WhenCondition whenCondition = wrappedByTimeout(thenCondition, maxWaitSeconds);
outerThenCondition.addExecutable(whenCondition);

return outerThenCondition;
}

这里为了实现正常执行 THEN 操作的超时而不影响到后置组件,选择对 ThenCondition 进了一层包装

这里包装的目的:

  • ThenCondition 包装为 WhenCondition 使其支持超时(因为 WHEN 原生支持超时)
  • 最外层包装一层 ThenConditionFinallyCondition 放在外面避免超时影响(因为 WHEN 也不支持后置处理)

最终将包装好的 ThenCondition 返回上游业务使用

其他

对于其他的类型,会被包装为 TimeoutCondition

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* 将一个 Executable 包装为带有单独超时控制的 TimeoutCondition
*
* @param executable 待包装对象
* @param maxWaitSeconds 最大等待秒数
* @return 包装后的 TimeoutCondition
*/
private TimeoutCondition wrappedByTimeout(Executable executable, Integer maxWaitSeconds) {
TimeoutCondition timeoutCondition = new TimeoutCondition();
timeoutCondition.addExecutable(executable);
timeoutCondition.setMaxWaitTime(maxWaitSeconds);
timeoutCondition.setMaxWaitTimeUnit(TimeUnit.SECONDS);
return timeoutCondition;
}

TimeoutCondition 本质上是 WhenCondition 的子类,优化了超时日志

日志处理

一开始写这个标题是因为感觉 ThenCondition 的包装会影响到日志的输出(毕竟包装了一个 WhenCondition 和一个 ThenCondition

但是又看了一下代码发现日志是只有 Component 才会输出的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public String getExecuteStepStr(boolean withTimeSpent) {
StringBuilder str = new StringBuilder();
CmpStep cmpStep;
for (Iterator<CmpStep> it = executeSteps.iterator(); it.hasNext();) {
cmpStep = it.next();
if (withTimeSpent) {
str.append(cmpStep.buildStringWithTime());
}
else {
str.append(cmpStep.buildString());
}
if (it.hasNext()) {
str.append("==>");
}
}
this.executeStepsStr = str.toString();
return this.executeStepsStr;
}

不过想到又看到的一个 issue

希望框架对并行日志进行优化 · Issue #I8B0MI · dromara/liteFlow - Gitee.com

这里会不会产生影响?

总结

类型 操作
WhenCondition 直接设置属性
FinallyCondition 不允许
ThenCondition 内部逻辑包装为 WHEN,外部包装一个 THEN,将后置处理器移至外部
其他 包装为 WhenCondition 的子类 TimeoutCondition

可以看到随着迭代关键字越来越多

功能和功能之间互相影响也越来越大

一个功能的添加需要考虑对不同已有功能的影响,需要对应出不同的实现方式