LiteFlow - 决策路由

思想

LiteFlow 2.12.0 版本的主要 Feature 就是对决策路由的支持

功能引入的文章

对应的 Issue

这样就能实现类似 Drools 中的决策表功能

使用 SWITCH 实现

决策路由的本质就是判断所有规则的执行条件,并且执行符合条件的规则

LangChain 早期的设计,入口都是指定一个 Chain 进行执行,但也是可以变相支持这种操作,在入口设置一个 SWITCH 节点,相当于用一个逻辑判断组合所有的 Chain

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
<flow>
<chain name="chain">
THEN(SWITCH(condition)
.to(
decisionA,
decisionB,
decisionC
)
);
</chain>

<!-- 决策逻辑 A-->
<chain name="decisionA">
...
</chain>

<!-- 决策逻辑 B-->
<chain name="decisionB">
...
</chain>

<!-- 决策逻辑 C-->
<chain name="decisionC">
...
</chain>
</flow>

上面这个 EL 将决策 A、B、C 三个 Chain 通过一个选择组件联系了起来,最终实现决策表的效果

需要注意:

  • 路由体只能由布尔组件组成
  • 路由体只能是与或非表达式
  • 匹配到的每一个规则的上下文实例都是单独的,运行时是并行执行
  • 路由中的 EL 启动时一定会检查
  • 路由体的 EL 支持 tag 等关键字
  • JSON 和 YAML 等格式使用 value name 的 key 来存放规则体,所以没有 body

SWITCH 选择组件

这里复习一下 CMP 中的选择组件

选择组件需要继承 NodeSwitchComponent,实现 processSwitch 方法

上面的例子中,就可以这样来实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@LiteflowComponent("condition")
public class MyCondition extends NodeSwitchComponent {

@Override
public String processSwitch() throws Exception {
// 决策判断
int type = 0;

switch (type) {
case 1:
return "decisionA";
case 2:
return "decisionB";
case 3:
return "decisionV";
default:
throw new IllegalStateException("Unexpected value: " + type);
}
}
}

这里返回的字符串就是控制下一个流程的执行,支持

  • node ID(例如 @LiteflowComponent 上配置的组件 ID)
  • 表达式 ID(对于 EL 表达式中一个表达式设置的 ID WHEN(c,d).id("w1")
  • 组件标签(a.tag("1")
  • 表达式标签(WHEN(c,d).tag("w1")
  • 子流程标签(sub 是一个 chain,sub.tag("w1")

代码中对组件进行实现后,对应的 EL 就可以如下书写

1
2
3
4
5
6
7
<chain name="switch1">
SWITCH(x).TO(a, b, c).DEFAULT(y);
</chain>

<chain name="switch1">
SWITCH(x).TO(a, b, THEN(c, d).id("t1")).DEFAULT(y);
</chain>

存在的问题

虽然用 SWITCH 可以实现类似全局决策表的功能,但其实还是有一些不同

  • 每次都要通过前置选择组件的方式实现,操作繁琐
  • 增加新的 Chain 实现时容易漏掉,并且需要改动选择组件实现的代码来接入
  • 无法实现更灵活的组件选择(例如根据某种情况进行多种排列组合,要对 EL 进行表达式拆分)

使用

文档

在定义规则的时候,新增了 routebody 标签

  • route 内是决策 EL,决策 EL 里只能用与或非表达式,内部的组件只能是布尔组件
  • body 内就是原有的规则 EL

我理解基本的实现思想就是定义了一个顶层的 EL,作为决策路由的入口,相当于顶层的选择组件,是框架帮忙加的

1
2
3
4
5
6
7
8
<chain name="chain1">
<route>
AND(r1, r2, r3)
</route>
<body>
THEN(a, b, c);
</body>
</chain>

执行时调用 FlowExecutor 提供的一系列新的 API

1
List<LiteflowResponse> responseList = flowExecutor.executeRouteChain(requestData, YourContext.class);

调用后 LiteFlow 会去并行的判断决策路由,符合决策的规则也会被并行的进行执行

返回是一个 List<LiteflowResponse>,List 里面的就是每一个匹配到的规则执行后的结果

Demo

假设有一个成就系统,根据用户运动的一些数据进行成就的发放

模拟用户成就数据

这里用一个静态 set 存放数据,假设是一次持久化操作

1
2
3
4
5
6
7
8
9
10
11
12
13
public class UserAchievement {

private static final Map<String, Integer> ACHIEVEMENTS_AND_COUNT = new HashMap<>();

@Synchronized
public static void addAchievement(String achievement) {
ACHIEVEMENTS_AND_COUNT.put(achievement, ACHIEVEMENTS_AND_COUNT.getOrDefault(achievement, 0) + 1);
}

public static Map<String, Integer> getAchievements() {
return ACHIEVEMENTS_AND_COUNT;
}
}

上下文参数

承接用户的运动数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class ExerciseData {
/**
* 用户 ID
*/
private Long userId;

/**
* 运动距离,单位米
*/
private Integer exerciseDistanceMeters;

/**
* 运动时间,单位分钟
*/
private Integer exerciseTimeMinutes;

/**
* 完成时间
*/
private Long timestamp;
}

判断组件

跑步成就相关判断的布尔组件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// 5 公里
@Component("jr5k")
public class JudgeRun5Kilometers extends NodeBooleanComponent {

@Override
public boolean processBoolean() throws Exception {
ExerciseData exerciseData = getRequestData();

return Optional.ofNullable(exerciseData)
.map(ExerciseData::getExerciseDistanceMeters)
.map(distance -> distance >= 5000)
.orElse(false);
}
}

// 10 公里
@Component("jr10k")
public class JudgeRun10Kilometers extends NodeBooleanComponent {

@Override
public boolean processBoolean() throws Exception {
ExerciseData exerciseData = getRequestData();

return Optional.ofNullable(exerciseData)
.map(ExerciseData::getExerciseDistanceMeters)
.map(distance -> distance >= 10000)
.orElse(false);
}
}

运动时间成就相关判断的布尔组件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// 运动 30 分钟
@Component("je30m")
public class JudgeExercise30Minutes extends NodeBooleanComponent {

@Override
public boolean processBoolean() throws Exception {
ExerciseData exerciseData = getRequestData();

return Optional.ofNullable(exerciseData)
.map(ExerciseData::getExerciseTimeMinutes)
.map(timeMinutes -> timeMinutes >= 30)
.orElse(false);
}
}

// 运动 60 分钟
@Component("je60m")
public class JudgeExercise60Minutes extends NodeBooleanComponent {

@Override
public boolean processBoolean() throws Exception {
ExerciseData exerciseData = getRequestData();

return Optional.ofNullable(exerciseData)
.map(ExerciseData::getExerciseTimeMinutes)
.map(timeMinutes -> timeMinutes >= 60)
.orElse(false);
}
}

判断组件就是用于决策路由 EL 中的 rounte 部分

执行组件

执行组件用于发放奖励

1
2
3
4
5
6
7
8
9
// 跑步 5 公里成就
@LiteflowComponent("ar5k")
public class AchievementRun5Kilometers extends NodeComponent {

@Override
public void process() throws Exception {
UserAchievement.addAchievement("run 5 kilometers");
}
}

其他的实现类似

EL

使用 EL 配置决策路由

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
<?xml version="1.0" encoding="UTF-8"?>

<flow>
<chain name="exercise-30-minutes">
<route>
je30m;
</route>
<body>
THEN(ae30m);
</body>
</chain>

<chain name="exercise-60-minutes">
<route>
je60m;
</route>
<body>
THEN(ae60m);
</body>
</chain>

<chain name="run-5-kilometers">
<route>
jr5k;
</route>
<body>
THEN(ar5k);
</body>
</chain>

<chain name="run-10-kilometers">
<route>
jr10k;
</route>
<body>
THEN(ar10k);
</body>
</chain>
</flow>

执行

1
2
3
4
5
6
7
8
9
10
11
12
ExerciseData exerciseData = ExerciseData.builder()
.userId(1L)
.exerciseTimeMinutes(50)
.exerciseDistanceMeters(15000)
.timestamp(System.currentTimeMillis())
.build();

// 执行结果
List<LiteflowResponse> responseList = flowExecutor.executeRouteChain(exerciseData);

// 成就结果
System.out.println("STAR " + UserAchievement.getAchievements());
1
STAR {run 10 kilometers=1, exercise 30 minutes=1, run 5 kilometers=1}

可以看到 15 km 的运动距离同时获得了 5 和 10 km 两个成就

1
2
3
4
5
6
7
8
ExerciseData exerciseData = ExerciseData.builder()
.userId(1L)
.exerciseTimeMinutes(50)
.exerciseDistanceMeters(15000)
.timestamp(System.currentTimeMillis())
.build();

System.out.println("STAR " + UserAchievement.getAchievements());
1
STAR {run 5 kilometers=1}

只满足了跑步 5 km 的成就

源码

FlowExcutor 执行

路由决策的入口最终会在设置一些运行时参数后调用 doExecuteWithRoute

1
2
3
4
5
6
private List<LiteflowResponse> executeWithRoute(Object param, String requestId, Class<?>[] contextBeanClazzArray, Object[] contextBeanArray){
// 核心入口 返回值是 Slot 的集合
List<Slot> slotList = doExecuteWithRoute(param, requestId, contextBeanClazzArray, contextBeanArray);
// wrap response
return slotList.stream().map(LiteflowResponse::newMainResponse).collect(Collectors.toList());
}

执行准备

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private List<Slot> doExecuteWithRoute(Object param, String requestId, Class<?>[] contextBeanClazzArray, Object[] contextBeanArray){
// 判断 FlowBus 初始化
if (FlowBus.needInit()) {
init(true);
}

// 获取带有路由体的 Chain 集合
List<Chain> routeChainList = FlowBus.getChainMap().values().stream().filter(chain -> chain.getRouteItem() != null).collect(Collectors.toList());

// 如果没有会抛一个异常,因为没有路由规则就没必要调用 executeWithRoute
if (CollUtil.isEmpty(routeChainList)){
throw new RouteChainNotFoundException("cannot find any route chain");
}

// 确定 requestId
String finalRequestId;
if (StrUtil.isBlank(requestId)){
finalRequestId = IdGeneratorHolder.getInstance().generate();
}else{
finalRequestId = requestId;
}
...
}

这里在做调用前的准备

  1. 判断 FlowBus 是否初始化完成;FlowBus 是存放各种 Executable 的工具,初始化完成后应该加载了环境下所有的组件;没有初始化则初始化是为了兼容其他环境或者 Spring 环境下的特殊配置,一般 Spring 环境在 starter 中的 Configuration 就完成了初始化
  2. 获取所有决策路由的 Chain 并校验
  3. 确定 requestId;生成一个或者使用参数

异步执行路由体

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private List<Slot> doExecuteWithRoute(Object param, String requestId, Class<?>[] contextBeanClazzArray, Object[] contextBeanArray){
...

// 异步执行 route el
List<Tuple> routeTupleList = new ArrayList<>();
for (Chain routeChain : routeChainList){
CompletableFuture<Slot> f = CompletableFuture.supplyAsync(
() -> doExecute(routeChain.getChainId(), param, finalRequestId, contextBeanClazzArray, contextBeanArray, null, InnerChainTypeEnum.NONE, ChainExecuteModeEnum.ROUTE)
);

routeTupleList.add(new Tuple(routeChain, f));
}

// 获取结果
CompletableFuture<?> resultRouteCf = CompletableFuture.allOf(routeTupleList.stream().map(
(Function<Tuple, CompletableFuture<?>>) tuple -> tuple.get(1)
).collect(Collectors.toList()).toArray(new CompletableFuture[] {}));
try{
resultRouteCf.get();
}catch (Exception e){
throw new LiteFlowException("There is An error occurred while executing the route.", e);
}

...
}
  1. 异步执行这些 Chain,将 CompletableFuture 和 Chain 本身包装为 Tuple;因为后面执行 body 里的逻辑还需要用到这个 Chain 对象
  2. 等待任务都执行完成,check 一下有没有异常,任一有异常就会抛出一个 LiteFlowException

获取路由结果并执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 把 route 执行为 true 都过滤出来
List<Chain> matchedRouteChainList = routeTupleList.stream().filter(tuple -> {
try{
CompletableFuture<Slot> f = tuple.get(1);
Slot slot = f.get();
return BooleanUtil.isTrue(slot.getRouteResult());
}catch (Exception e){
return false;
}
}).map(
(Function<Tuple, Chain>) tuple -> tuple.get(0)
).collect(Collectors.toList());

if (CollUtil.isEmpty(matchedRouteChainList)){
throw new NoMatchedRouteChainException("there is no matched route chain");
}

// 异步分别执行这些 chain
List<CompletableFuture<Slot>> executeChainCfList = new ArrayList<>();
for (Chain chain : matchedRouteChainList){
CompletableFuture<Slot> cf = CompletableFuture.supplyAsync(
() -> doExecute(chain.getChainId(), param, finalRequestId, contextBeanClazzArray, contextBeanArray, null, InnerChainTypeEnum.NONE, ChainExecuteModeEnum.BODY)
);
executeChainCfList.add(cf);
}
  1. 从上一步的流程中获取路由结果为 true 的
  2. 异步执行路由结果为 true 的 Chain

处理结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
CompletableFuture<?> resultChainCf = CompletableFuture.allOf(executeChainCfList.toArray(new CompletableFuture[] {}));
try{
resultChainCf.get();
}catch (Exception e){
throw new LiteFlowException("There is An error occurred while executing the matched chain.", e);
}


List<Slot> resultSlotList = executeChainCfList.stream().map(slotCompletableFuture -> {
try{
return slotCompletableFuture.get();
}catch (Exception e){
return null;
}
}).filter(Objects::nonNull).collect(Collectors.toList());

LOG.info("There are {} chains that matched the route.", resultSlotList.size());

return resultSlotList;

最终就是把结果获取,最后返回

返回的对象是 Slot,保存了整个 Chain 运行时的元数据、过程数据、结果等

包装 Response

1
return slotList.stream().map(LiteflowResponse::newMainResponse).collect(Collectors.toList());
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private static LiteflowResponse newResponse(Slot slot, Exception e) {
LiteflowResponse response = new LiteflowResponse();
response.setChainId(slot.getChainId());
if (e != null) {
response.setSuccess(false);
response.setCause(e);
response.setMessage(response.getCause().getMessage());
response.setCode(response.getCause() instanceof LiteFlowException
? ((LiteFlowException) response.getCause()).getCode() : null);
}
else {
response.setSuccess(true);
}
response.setSlot(slot);
return response;
}

最终根据执行情况包装为 LiteflowResponse,设置不同的属性

EL 解析

这里以 XML 格式为例

解析 route 和 body

com.yomahub.liteflow.parser.helper.ParserHelper#parseOneChainEl(org.dom4j.Element)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
Element routeElement = e.element(ROUTE);

LiteFlowChainELBuilder builder = LiteFlowChainELBuilder.createChain().setChainId(chainId);

// 存在路由体
if (routeElement != null){
builder.setRoute(ElRegexUtil.removeComments(routeElement.getText()));

// 有路由体必然有 Body
Element bodyElement = e.element(BODY);
if (bodyElement == null){
String errMsg = StrUtil.format("If you have defined the tag <route>, then you must define the tag <body> in chain[{}]", chainId);
throw new FlowSystemException(errMsg);
}
builder.setEL(ElRegexUtil.removeComments(bodyElement.getText()));
}
// 不存在
else{
// 这里兼容新增的 Body 标签写法和原有无任何标签的写法
Element bodyElement = e.element(BODY);
if (bodyElement != null){
builder.setEL(ElRegexUtil.removeComments(bodyElement.getText()));
}else{
builder.setEL(ElRegexUtil.removeComments(e.getText()));
}
}

路由体是新的处理,使用 builder.setRoute(ElRegexUtil.removeComments(routeElement.getText()))

构造相关的对象

而对于 body 则和旧有没有标签的逻辑是一致的

Build 路由体

com.yomahub.liteflow.builder.el.LiteFlowChainELBuilder#setRoute

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public LiteFlowChainELBuilder setRoute(String routeEl) {
if (StrUtil.isBlank(routeEl)) {
String errMsg = StrUtil.format("You have defined the label <route> but there is no el in the chain route[{}].", chain.getChainId());
throw new FlowSystemException(errMsg);
}
List<String> errorList = new ArrayList<>();
try {
DefaultContext<String, Object> context = new DefaultContext<>();

// 往上下文里放入所有的node,使得el表达式可以直接引用到nodeId
FlowBus.getNodeMap().keySet().forEach(nodeId -> context.put(nodeId, FlowBus.getNode(nodeId)));

// 解析route el成为一个executable
Executable routeExecutable = (Executable) EXPRESS_RUNNER.execute(routeEl, context, errorList, true, true);

// 判断routeEL是不是符合规范
if (!(routeExecutable instanceof AndOrCondition || routeExecutable instanceof NotCondition || routeExecutable instanceof Node)) {
throw new RouteELInvalidException("the route EL can only be a boolean node, or an AND or OR expression.");
}

// 把主要的condition加入
this.route = routeExecutable;
return this;
} catch (QLException e) {
// EL 底层会包装异常,这里是曲线处理
if (ObjectUtil.isNotNull(e.getCause()) && Objects.equals(e.getCause().getMessage(), DataNotFoundException.MSG)) {
// 构建错误信息
String msg = buildDataNotFoundExceptionMsg(routeEl);
throw new ELParseException(msg);
} else if (ObjectUtil.isNotNull(e.getCause())) {
throw new ELParseException(e.getCause().getMessage());
} else {
throw new ELParseException(e.getMessage());
}
} catch (RouteELInvalidException e) {
throw e;
} catch (Exception e) {
String errMsg = StrUtil.format("parse el fail in this chain[{}];\r\n", chain.getChainId());
throw new ELParseException(errMsg + e.getMessage());
}
}
  1. 构造 EL 解析的上下文;这里只放入了 Node,说明路由体中不支持子流程
  2. 经过 ExpressRunner,解析出 Executable
  3. 校验 Executable 只能是布尔组件、或者与或非 Condition
  4. 保存到 route 属性

Build Chain

1
2
3
4
5
6
7
8
9
public void build() {
this.chain.setRouteItem(this.route);
this.chain.setConditionList(this.conditionList);

//暂且去掉循环依赖检测,因为有发现循环依赖检测在对大的EL进行检测的时候,会导致CPU飙升,也或许是jackson低版本的问题
//checkBuild();

FlowBus.addChain(this.chain);
}

com.yomahub.liteflow.builder.el.LiteFlowChainELBuilder#build

  1. 执行 build 方法后会将路由体和规则设置为 Chain 对象的属性
  2. 将该 Chain 对象加入 FlowBus

总结

本质上就是从框架层面支持了顶层的判断表达,使用异步判断条件是否符合并且执行结果

路由和路由之间是隔离的,性能可能会存在问题

参考

🍽决策路由用法 | LiteFlow