思想
LiteFlow 2.12.0 版本的主要 Feature 就是对决策路由的支持
功能引入的文章
对应的
Issue
这样就能实现类似 Drools 中的决策表功能
使用 SWITCH 实现
决策路由的本质就是判断所有规则的执行条件,并且执行符合条件的规则
LangChain 早期的设计,入口都是指定一个 Chain
进行执行,但也是可以变相支持这种操作,在入口设置一个 SWITCH
节点,相当于用一个逻辑判断组合所有的 Chain
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 <flow > <chain name ="chain" > THEN(SWITCH(condition) .to( decisionA, decisionB, decisionC ) ); </chain > <chain name ="decisionA" > ... </chain > <chain name ="decisionB" > ... </chain > <chain name ="decisionC" > ... </chain > </flow >
上面这个 EL 将决策 A、B、C 三个 Chain
通过一个选择组件联系了起来,最终实现决策表的效果
需要注意:
路由体只能由布尔组件组成
路由体只能是与或非表达式
匹配到的每一个规则的上下文实例都是单独的,运行时是并行执行
路由中的 EL 启动时一定会检查
路由体的 EL 支持 tag
等关键字
JSON 和 YAML 等格式使用 value
name 的 key
来存放规则体,所以没有 body
SWITCH 选择组件
这里复习一下 CMP 中的选择组件
选择组件需要继承 NodeSwitchComponent
,实现
processSwitch
方法
上面的例子中,就可以这样来实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 @LiteflowComponent("condition") public class MyCondition extends NodeSwitchComponent { @Override public String processSwitch () throws Exception { int type = 0 ; switch (type) { case 1 : return "decisionA" ; case 2 : return "decisionB" ; case 3 : return "decisionV" ; default : throw new IllegalStateException ("Unexpected value: " + type); } } }
这里返回的字符串就是控制下一个流程的执行,支持
node ID(例如 @LiteflowComponent
上配置的组件 ID)
表达式 ID(对于 EL 表达式中一个表达式设置的 ID
WHEN(c,d).id("w1")
)
组件标签(a.tag("1")
)
表达式标签(WHEN(c,d).tag("w1")
)
子流程标签(sub 是一个 chain,sub.tag("w1")
)
代码中对组件进行实现后,对应的 EL 就可以如下书写
1 2 3 4 5 6 7 <chain name ="switch1" > SWITCH(x).TO(a, b, c).DEFAULT(y); </chain > <chain name ="switch1" > SWITCH(x).TO(a, b, THEN(c, d).id("t1")).DEFAULT(y); </chain >
存在的问题
虽然用 SWITCH 可以实现类似全局决策表的功能,但其实还是有一些不同
每次都要通过前置选择组件的方式实现,操作繁琐
增加新的 Chain
实现时容易漏掉,并且需要改动选择组件实现的代码来接入
无法实现更灵活的组件选择(例如根据某种情况进行多种排列组合,要对 EL
进行表达式拆分)
使用
文档
在定义规则的时候,新增了 route
和 body
标签
route 内是决策 EL,决策 EL
里只能用与或非表达式,内部的组件只能是布尔组件
body 内就是原有的规则 EL
我理解基本的实现思想就是定义了一个顶层的
EL,作为决策路由的入口,相当于顶层的选择组件,是框架帮忙加的
1 2 3 4 5 6 7 8 <chain name ="chain1" > <route > AND(r1, r2, r3) </route > <body > THEN(a, b, c); </body > </chain >
执行时调用 FlowExecutor
提供的一系列新的 API
1 List<LiteflowResponse> responseList = flowExecutor.executeRouteChain(requestData, YourContext.class);
调用后 LiteFlow
会去并行的判断决策路由,符合决策的规则也会被并行的进行执行
返回是一个 List<LiteflowResponse>
,List
里面的就是每一个匹配到的规则执行后的结果
Demo
假设有一个成就系统,根据用户运动的一些数据进行成就的发放
模拟用户成就数据
这里用一个静态 set 存放数据,假设是一次持久化操作
1 2 3 4 5 6 7 8 9 10 11 12 13 public class UserAchievement { private static final Map<String, Integer> ACHIEVEMENTS_AND_COUNT = new HashMap <>(); @Synchronized public static void addAchievement (String achievement) { ACHIEVEMENTS_AND_COUNT.put(achievement, ACHIEVEMENTS_AND_COUNT.getOrDefault(achievement, 0 ) + 1 ); } public static Map<String, Integer> getAchievements () { return ACHIEVEMENTS_AND_COUNT; } }
上下文参数
承接用户的运动数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 @Data @Builder @AllArgsConstructor @NoArgsConstructor public class ExerciseData { private Long userId; private Integer exerciseDistanceMeters; private Integer exerciseTimeMinutes; private Long timestamp; }
判断组件
跑步成就相关判断的布尔组件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 @Component("jr5k") public class JudgeRun5Kilometers extends NodeBooleanComponent { @Override public boolean processBoolean () throws Exception { ExerciseData exerciseData = getRequestData(); return Optional.ofNullable(exerciseData) .map(ExerciseData::getExerciseDistanceMeters) .map(distance -> distance >= 5000 ) .orElse(false ); } } @Component("jr10k") public class JudgeRun10Kilometers extends NodeBooleanComponent { @Override public boolean processBoolean () throws Exception { ExerciseData exerciseData = getRequestData(); return Optional.ofNullable(exerciseData) .map(ExerciseData::getExerciseDistanceMeters) .map(distance -> distance >= 10000 ) .orElse(false ); } }
运动时间成就相关判断的布尔组件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 @Component("je30m") public class JudgeExercise30Minutes extends NodeBooleanComponent { @Override public boolean processBoolean () throws Exception { ExerciseData exerciseData = getRequestData(); return Optional.ofNullable(exerciseData) .map(ExerciseData::getExerciseTimeMinutes) .map(timeMinutes -> timeMinutes >= 30 ) .orElse(false ); } } @Component("je60m") public class JudgeExercise60Minutes extends NodeBooleanComponent { @Override public boolean processBoolean () throws Exception { ExerciseData exerciseData = getRequestData(); return Optional.ofNullable(exerciseData) .map(ExerciseData::getExerciseTimeMinutes) .map(timeMinutes -> timeMinutes >= 60 ) .orElse(false ); } }
判断组件就是用于决策路由 EL 中的 rounte
部分
执行组件
执行组件用于发放奖励
1 2 3 4 5 6 7 8 9 @LiteflowComponent("ar5k") public class AchievementRun5Kilometers extends NodeComponent { @Override public void process () throws Exception { UserAchievement.addAchievement("run 5 kilometers" ); } }
其他的实现类似
EL
使用 EL 配置决策路由
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 <?xml version="1.0" encoding="UTF-8" ?> <flow > <chain name ="exercise-30-minutes" > <route > je30m; </route > <body > THEN(ae30m); </body > </chain > <chain name ="exercise-60-minutes" > <route > je60m; </route > <body > THEN(ae60m); </body > </chain > <chain name ="run-5-kilometers" > <route > jr5k; </route > <body > THEN(ar5k); </body > </chain > <chain name ="run-10-kilometers" > <route > jr10k; </route > <body > THEN(ar10k); </body > </chain > </flow >
执行
1 2 3 4 5 6 7 8 9 10 11 12 ExerciseData exerciseData = ExerciseData.builder() .userId(1L ) .exerciseTimeMinutes(50 ) .exerciseDistanceMeters(15000 ) .timestamp(System.currentTimeMillis()) .build(); List<LiteflowResponse> responseList = flowExecutor.executeRouteChain(exerciseData); System.out.println("STAR " + UserAchievement.getAchievements());
1 STAR {run 10 kilometers =1, exercise 30 minutes =1, run 5 kilometers =1}
可以看到 15 km 的运动距离同时获得了 5 和 10 km 两个成就
1 2 3 4 5 6 7 8 ExerciseData exerciseData = ExerciseData.builder() .userId(1L ) .exerciseTimeMinutes(50 ) .exerciseDistanceMeters(15000 ) .timestamp(System.currentTimeMillis()) .build(); System.out.println("STAR " + UserAchievement.getAchievements());
1 STAR {run 5 kilometers =1}
只满足了跑步 5 km 的成就
源码
FlowExcutor 执行
路由决策的入口最终会在设置一些运行时参数后调用
doExecuteWithRoute
1 2 3 4 5 6 private List<LiteflowResponse> executeWithRoute (Object param, String requestId, Class<?>[] contextBeanClazzArray, Object[] contextBeanArray) { List<Slot> slotList = doExecuteWithRoute(param, requestId, contextBeanClazzArray, contextBeanArray); return slotList.stream().map(LiteflowResponse::newMainResponse).collect(Collectors.toList()); }
执行准备
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 private List<Slot> doExecuteWithRoute (Object param, String requestId, Class<?>[] contextBeanClazzArray, Object[] contextBeanArray) { if (FlowBus.needInit()) { init(true ); } List<Chain> routeChainList = FlowBus.getChainMap().values().stream().filter(chain -> chain.getRouteItem() != null ).collect(Collectors.toList()); if (CollUtil.isEmpty(routeChainList)){ throw new RouteChainNotFoundException ("cannot find any route chain" ); } String finalRequestId; if (StrUtil.isBlank(requestId)){ finalRequestId = IdGeneratorHolder.getInstance().generate(); }else { finalRequestId = requestId; } ... }
这里在做调用前的准备
判断 FlowBus
是否初始化完成;FlowBus
是存放各种 Executable
的工具,初始化完成后应该加载了环境下所有的组件;没有初始化则初始化是为了兼容其他环境或者
Spring 环境下的特殊配置,一般 Spring 环境在 starter 中的 Configuration
就完成了初始化
获取所有决策路由的 Chain 并校验
确定 requestId;生成一个或者使用参数
异步执行路由体
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 private List<Slot> doExecuteWithRoute (Object param, String requestId, Class<?>[] contextBeanClazzArray, Object[] contextBeanArray) { ... List<Tuple> routeTupleList = new ArrayList <>(); for (Chain routeChain : routeChainList){ CompletableFuture<Slot> f = CompletableFuture.supplyAsync( () -> doExecute(routeChain.getChainId(), param, finalRequestId, contextBeanClazzArray, contextBeanArray, null , InnerChainTypeEnum.NONE, ChainExecuteModeEnum.ROUTE) ); routeTupleList.add(new Tuple (routeChain, f)); } CompletableFuture<?> resultRouteCf = CompletableFuture.allOf(routeTupleList.stream().map( (Function<Tuple, CompletableFuture<?>>) tuple -> tuple.get(1 ) ).collect(Collectors.toList()).toArray(new CompletableFuture [] {})); try { resultRouteCf.get(); }catch (Exception e){ throw new LiteFlowException ("There is An error occurred while executing the route." , e); } ... }
异步执行这些 Chain,将 CompletableFuture
和 Chain
本身包装为 Tuple
;因为后面执行 body 里的逻辑还需要用到这个
Chain 对象
等待任务都执行完成,check 一下有没有异常,任一有异常就会抛出一个
LiteFlowException
获取路由结果并执行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 List<Chain> matchedRouteChainList = routeTupleList.stream().filter(tuple -> { try { CompletableFuture<Slot> f = tuple.get(1 ); Slot slot = f.get(); return BooleanUtil.isTrue(slot.getRouteResult()); }catch (Exception e){ return false ; } }).map( (Function<Tuple, Chain>) tuple -> tuple.get(0 ) ).collect(Collectors.toList()); if (CollUtil.isEmpty(matchedRouteChainList)){ throw new NoMatchedRouteChainException ("there is no matched route chain" ); } List<CompletableFuture<Slot>> executeChainCfList = new ArrayList <>(); for (Chain chain : matchedRouteChainList){ CompletableFuture<Slot> cf = CompletableFuture.supplyAsync( () -> doExecute(chain.getChainId(), param, finalRequestId, contextBeanClazzArray, contextBeanArray, null , InnerChainTypeEnum.NONE, ChainExecuteModeEnum.BODY) ); executeChainCfList.add(cf); }
从上一步的流程中获取路由结果为 true 的
异步执行路由结果为 true 的 Chain
处理结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 CompletableFuture<?> resultChainCf = CompletableFuture.allOf(executeChainCfList.toArray(new CompletableFuture [] {})); try { resultChainCf.get(); }catch (Exception e){ throw new LiteFlowException ("There is An error occurred while executing the matched chain." , e); } List<Slot> resultSlotList = executeChainCfList.stream().map(slotCompletableFuture -> { try { return slotCompletableFuture.get(); }catch (Exception e){ return null ; } }).filter(Objects::nonNull).collect(Collectors.toList()); LOG.info("There are {} chains that matched the route." , resultSlotList.size()); return resultSlotList;
最终就是把结果获取,最后返回
返回的对象是 Slot,保存了整个 Chain
运行时的元数据、过程数据、结果等
包装 Response
1 return slotList.stream().map(LiteflowResponse::newMainResponse).collect(Collectors.toList());
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 private static LiteflowResponse newResponse (Slot slot, Exception e) { LiteflowResponse response = new LiteflowResponse (); response.setChainId(slot.getChainId()); if (e != null ) { response.setSuccess(false ); response.setCause(e); response.setMessage(response.getCause().getMessage()); response.setCode(response.getCause() instanceof LiteFlowException ? ((LiteFlowException) response.getCause()).getCode() : null ); } else { response.setSuccess(true ); } response.setSlot(slot); return response; }
最终根据执行情况包装为
LiteflowResponse
,设置不同的属性
EL 解析
这里以 XML 格式为例
解析 route 和 body
见
com.yomahub.liteflow.parser.helper.ParserHelper#parseOneChainEl(org.dom4j.Element)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 Element routeElement = e.element(ROUTE);LiteFlowChainELBuilder builder = LiteFlowChainELBuilder.createChain().setChainId(chainId);if (routeElement != null ){ builder.setRoute(ElRegexUtil.removeComments(routeElement.getText())); Element bodyElement = e.element(BODY); if (bodyElement == null ){ String errMsg = StrUtil.format("If you have defined the tag <route>, then you must define the tag <body> in chain[{}]" , chainId); throw new FlowSystemException (errMsg); } builder.setEL(ElRegexUtil.removeComments(bodyElement.getText())); } else { Element bodyElement = e.element(BODY); if (bodyElement != null ){ builder.setEL(ElRegexUtil.removeComments(bodyElement.getText())); }else { builder.setEL(ElRegexUtil.removeComments(e.getText())); } }
路由体是新的处理,使用
builder.setRoute(ElRegexUtil.removeComments(routeElement.getText()))
构造相关的对象
而对于 body 则和旧有没有标签的逻辑是一致的
Build 路由体
见
com.yomahub.liteflow.builder.el.LiteFlowChainELBuilder#setRoute
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 public LiteFlowChainELBuilder setRoute (String routeEl) { if (StrUtil.isBlank(routeEl)) { String errMsg = StrUtil.format("You have defined the label <route> but there is no el in the chain route[{}]." , chain.getChainId()); throw new FlowSystemException (errMsg); } List<String> errorList = new ArrayList <>(); try { DefaultContext<String, Object> context = new DefaultContext <>(); FlowBus.getNodeMap().keySet().forEach(nodeId -> context.put(nodeId, FlowBus.getNode(nodeId))); Executable routeExecutable = (Executable) EXPRESS_RUNNER.execute(routeEl, context, errorList, true , true ); if (!(routeExecutable instanceof AndOrCondition || routeExecutable instanceof NotCondition || routeExecutable instanceof Node)) { throw new RouteELInvalidException ("the route EL can only be a boolean node, or an AND or OR expression." ); } this .route = routeExecutable; return this ; } catch (QLException e) { if (ObjectUtil.isNotNull(e.getCause()) && Objects.equals(e.getCause().getMessage(), DataNotFoundException.MSG)) { String msg = buildDataNotFoundExceptionMsg(routeEl); throw new ELParseException (msg); } else if (ObjectUtil.isNotNull(e.getCause())) { throw new ELParseException (e.getCause().getMessage()); } else { throw new ELParseException (e.getMessage()); } } catch (RouteELInvalidException e) { throw e; } catch (Exception e) { String errMsg = StrUtil.format("parse el fail in this chain[{}];\r\n" , chain.getChainId()); throw new ELParseException (errMsg + e.getMessage()); } }
构造 EL 解析的上下文;这里只放入了
Node,说明路由体中不支持子流程
经过 ExpressRunner
,解析出 Executable
校验 Executable
只能是布尔组件、或者与或非
Condition
保存到 route
属性
Build Chain
1 2 3 4 5 6 7 8 9 public void build () { this .chain.setRouteItem(this .route); this .chain.setConditionList(this .conditionList); FlowBus.addChain(this .chain); }
见
com.yomahub.liteflow.builder.el.LiteFlowChainELBuilder#build
执行 build
方法后会将路由体和规则设置为 Chain
对象的属性
将该 Chain 对象加入 FlowBus
总结
本质上就是从框架层面支持了顶层的判断表达,使用异步判断条件是否符合并且执行结果
路由和路由之间是隔离的,性能可能会存在问题
参考
🍽决策路由用法 |
LiteFlow