LiteFlow - 执行器初始化

自动装配

基于 SpringBoot 的实现方式自然使用到了 starter 机制,核心代码在 liteflow-spring-boot-starter 包下

首先来看 spring.factories 文件

1
2
3
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.yomahub.liteflow.springboot.config.LiteflowPropertyAutoConfiguration,\
com.yomahub.liteflow.springboot.config.LiteflowMainAutoConfiguration

可以看到一共有两个重要的自动装配的配置类:

  • LiteflowPropertyAutoConfiguration:配置属性的解析,封装为 LiteflowConfig 实例
  • LiteflowMainAutoConfiguration:LiteFlow 的核心功能对象在该配置类下实例化
    • SpringAware:Spring ApplicationContext 的获取与封装,便于后续业务中获取 bean 的操作
    • ComponentScanner:组件扫描器,用于扫描与封装 NodeComponent 节点的实现类
    • FlowExecutor:规则执行器
    • LiteflowExecutorInit:设置服务启动时初始化配置生效,用于启动时主动进行初始化操作
    • MonitorBus:监控类元数据

LiteflowPropertyAutoConfiguration

总的来说就是对配置属性的合并和封装,实例化 LiteflowConfig,便于后续业务的使用

合并了 liteflow 配置和 liteflow.monitor 配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Configuration
@EnableConfigurationProperties({LiteflowProperty.class, LiteflowMonitorProperty.class})
@PropertySource(
name = "Liteflow Default Properties",
value = "classpath:/META-INF/liteflow-default.properties")
public class LiteflowPropertyAutoConfiguration {

@Bean
public LiteflowConfig liteflowConfig(LiteflowProperty property, LiteflowMonitorProperty liteflowMonitorProperty){
LiteflowConfig liteflowConfig = new LiteflowConfig();
// ...
liteflowConfig.setPrintExecutionLog(property.isPrintExecutionLog());
liteflowConfig.setSubstituteCmpClass(property.getSubstituteCmpClass());
return liteflowConfig;
}
}

依赖前置 LiteflowPropertyLiteflowMonitorProperty bean,即流程配置参数和监控器的配置参数类

LiteflowMainAutoConfiguration

配置核心的各种实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@Configuration
@AutoConfigureAfter({LiteflowPropertyAutoConfiguration.class})
@ConditionalOnBean(LiteflowConfig.class)
@ConditionalOnProperty(prefix = "liteflow", name = "enable", havingValue = "true")
@Import(SpringAware.class)
public class LiteflowMainAutoConfiguration {

//实例化ComponentScanner
//多加一个SpringAware的意义是,确保在执行这个的时候,SpringAware这个bean已经被初始化
@Bean
public ComponentScanner componentScanner(LiteflowConfig liteflowConfig, SpringAware springAware){
return new ComponentScanner(liteflowConfig);
}

//实例化FlowExecutor
//多加一个SpringAware的意义是,确保在执行这个的时候,SpringAware这个bean已经被初始化
@Bean
public FlowExecutor flowExecutor(LiteflowConfig liteflowConfig, SpringAware springAware) {
FlowExecutor flowExecutor = new FlowExecutor();
flowExecutor.setLiteflowConfig(liteflowConfig);
return flowExecutor;
}

//FlowExecutor的初始化工作,和实例化分开来
@Bean
@ConditionalOnProperty(prefix = "liteflow", name = "parse-on-start", havingValue = "true")
public LiteflowExecutorInit liteflowExecutorInit(FlowExecutor flowExecutor) {
return new LiteflowExecutorInit(flowExecutor);
}

//实例化MonitorBus
//多加一个SpringAware的意义是,确保在执行这个的时候,SpringAware这个bean已经被初始化
@Bean("monitorBus")
@ConditionalOnProperty(prefix = "liteflow", name = "monitor.enable-log", havingValue = "true")
public MonitorBus monitorBus(LiteflowConfig liteflowConfig, SpringAware springAware) {
return new MonitorBus(liteflowConfig);
}
}

可以看出 bean 的加载顺序:LiteflowPropertyAutoConfiguration -> LiteflowConfig -> ComponentScanner & FlowExecutor & MonitorBus -> LiteflowExecutorInit

LiteflowExecutorInit 和 MonitorBus 依赖不同的配置条件,其中 liteflow.parse-on-start 默认是 true,liteflow.monitor.enable-log 默认是 false

ComponentScanner

ComponentScanner 在项目中也被缩写为 cmp,主要是依赖 Spring 的 BeanPostProcessor 来对 NodeComponent 的各种实现进行发现、保存、扩展等一系列操作

构造方法

配置类中使用了其构造方法进行实例化

1
2
3
4
5
6
7
public ComponentScanner(LiteflowConfig liteflowConfig) {
this.liteflowConfig = liteflowConfig;
if (liteflowConfig.getPrintBanner()) {
// 打印liteflow的LOGO
LOGOPrinter.print();
}
}

构造方法中主要进行了两个操作:

  1. 填充属性 liteflowConfig

  2. 打印 LOGO(可以看出这个是启动的第一个 bean);LOGOPrinter 就是一个打印 LOGO 的静态方法工具类

BeanPostProcessor

既然实现了 BeanPostProcessor 接口,那么重点就需要关注其对于 before 和 after 钩子方法的实现

1
2
3
4
5
6
7
8
9
public class ComponentScanner implements BeanPostProcessor {

@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
return bean;
}

// ....
}

before 方法是一个空实现,重点关注 after 方法

声明式组件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//判断是不是声明式组件
//如果是,就缓存到类属性的map中
if (LiteFlowProxyUtil.isDeclareCmp(bean.getClass())) {
LOG.info("proxy component[{}] has been found", beanName);
List<NodeComponent> nodeComponents = LiteFlowProxyUtil.proxy2NodeComponent(bean, beanName);
nodeComponents.forEach(
nodeComponent -> {
String nodeId = nodeComponent.getNodeId();
nodeId = StrUtil.isEmpty(nodeId) ? beanName : nodeId;
nodeComponentMap.put(nodeId, nodeComponent);
}
);
// 只有注解支持单bean多Node,所以一个直接返回
if (nodeComponents.size() == 1) {
return nodeComponents.get(0);
}
return bean;
}

如何判断是否是一个声明式组件,实现方法是 LiteFlowProxyUtil.isDeclareCmp

1
2
3
4
5
6
7
8
9
10
11
/**
* 判断一个bean是否是声明式组件
*/
public static boolean isDeclareCmp(Class<?> clazz) {
// 查看bean里的method是否有方法标记了@LiteflowMethod标注
// 这里的bean有可能是cglib加强过的class,所以要先进行个判断
Class<?> targetClass = getUserClass(clazz);
// 判断是否有方法标记了@LiteflowMethod标注,有则为声明式组件
return Arrays.stream(targetClass.getMethods())
.anyMatch(method -> method.getAnnotation(LiteflowMethod.class) != null);
}
  1. 获取真实的用户定义类
  2. 判断类中的方法是否标记了 @LiteflowMethod

实现类组件

实现类组件有两种:

  • 继承自 NodeComponent 的逻辑组件
  • 实现自 ICmpAroundAspect 的切面组件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 组件的扫描发现,扫到之后缓存到类属性map中
if (NodeComponent.class.isAssignableFrom(clazz)) {
LOG.info("component[{}] has been found", beanName);
NodeComponent nodeComponent = (NodeComponent) bean;
nodeComponentMap.put(beanName, nodeComponent);
return nodeComponent;
}

// 组件Aop的实现类加载
if (ICmpAroundAspect.class.isAssignableFrom(clazz)) {
LOG.info("component aspect implement[{}] has been found", beanName);
cmpAroundAspect = (ICmpAroundAspect) bean;
return cmpAroundAspect;
}

扫描组件的逻辑都是这类方式:

  1. 判断是否是其实现
  2. 转型
  3. 保存到对应引用或对象

可以看到对于逻辑组件,是一个 map 结构,存在多个组件;而对于切面组件,只能存在一个(事实上这段逻辑没有对实现进行判断,也就是说如果有多个实现的 bean 会进行覆盖,测试中也发现是这样)

脚本组件

脚本组件也有两种:

  • @ScriptBean 修饰的脚本 bean
  • @ScriptMethod 修饰的脚本方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// 扫描@ScriptBean修饰的类
ScriptBean scriptBean = AnnoUtil.getAnnotation(clazz, ScriptBean.class);
if (ObjectUtil.isNotNull(scriptBean)) {
ScriptBeanProxy proxy = new ScriptBeanProxy(bean, clazz, scriptBean);
ScriptBeanManager.addScriptBean(scriptBean.value(), proxy.getProxyScriptBean());
return bean;
}

// 扫描@ScriptMethod修饰的类
List<Method> scriptMethods = Arrays.stream(clazz.getMethods()).filter(method -> {
ScriptMethod scriptMethod = AnnoUtil.getAnnotation(method, ScriptMethod.class);
return ObjectUtil.isNotNull(scriptMethod) && StrUtil.isNotEmpty(scriptMethod.value());
}).collect(Collectors.toList());
if (CollUtil.isNotEmpty(scriptMethods)) {
Map<String, List<Method>> scriptMethodsGroupByValue = CollStreamUtil.groupBy(scriptMethods, method -> {
ScriptMethod scriptMethod = AnnoUtil.getAnnotation(method, ScriptMethod.class);
return scriptMethod.value();
}, Collectors.toList());

for (Map.Entry<String, List<Method>> entry : scriptMethodsGroupByValue.entrySet()) {
String key = entry.getKey();
List<Method> methods = entry.getValue();
ScriptMethodProxy proxy = new ScriptMethodProxy(bean, clazz, methods);

ScriptBeanManager.addScriptBean(key, proxy.getProxyScriptMethod());

}

return bean;
}

主要是将不同的脚本形式包装成代理类,转换成 bean 后放入 ScriptBeanManager 进行管理

转换的核心在 getProxyScriptMethod 方法,这个会在脚本相关源码中进一步分析

FlowExecutor

流程规则主要执行器,也是一个规则表执行的主要入口

这里主要关注其构造流程,初始化在后面的 LiteflowExecutorInit 中讲解

构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public FlowExecutor() {
// 设置FlowExecutor的Holder,虽然大部分地方都可以通过Spring上下文获取到,但放入Holder,还是为了某些地方能方便的取到
FlowExecutorHolder.setHolder(this);
// 初始化DataBus
DataBus.init();
}

public FlowExecutor(LiteflowConfig liteflowConfig) {
this.liteflowConfig = liteflowConfig;
// 把liteFlowConfig设到LiteFlowGetter中去
LiteflowConfigGetter.setLiteflowConfig(liteflowConfig);
// 设置FlowExecutor的Holder,虽然大部分地方都可以通过Spring上下文获取到,但放入Holder,还是为了某些地方能方便的取到
FlowExecutorHolder.setHolder(this);
if (BooleanUtil.isTrue(liteflowConfig.isParseOnStart())) {
this.init(true);
}
// 初始化DataBus
DataBus.init();
}

对于 Spring 的启动方式,使用了无参构造,随后将 liteflowConfig 用 set 方法进行赋值,带有参数的构造方法应该是为了非 Spring 环境准备的,因为在 Spring 环境中 liteflowConfig 使用了 properties 的机制,而 FlowExecutor 通过 @Bean 进行实例化

可以看到无参构造中主要进行了 DataBus.init() 操作

DataBus

数据 BUS,主要用来管理 Slot,用以分配和回收

Slot 即整个规则流程调用时的管理者,保存了整个规则进行中的数据、组件步骤等信息,每一次调用会生成新的 Slot,并且互相隔离

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private static ConcurrentHashMap<Integer, Slot> SLOTS;

private static ConcurrentLinkedQueue<Integer> QUEUE;

public static void init() {
// 如果 SLOTS 没有初始化
if (MapUtil.isEmpty(SLOTS)) {
// 获取 liteflowConfig,在 Spring 环境中这里会使用已经注入的 SpringAware 从 ApplicationContext 中获取
LiteflowConfig liteflowConfig = LiteflowConfigGetter.get();
// 从配置中获取 slot size 参数
currentIndexMaxValue = liteflowConfig.getSlotSize();
// 初始化 SLOTS
SLOTS = new ConcurrentHashMap<>();
// 初始化 QUEUE
QUEUE = IntStream.range(0, currentIndexMaxValue)
.boxed()
.collect(Collectors.toCollection(ConcurrentLinkedQueue::new));
}
}

这里 SLOTS 用来保存分配的序号和 Slot 对象,QUEUE 则是用来保存 index

即在并发的情况下,能够分配的运行资格由 QUEUE 控制,QUEUE 会根据并发情况进行扩容,具体的资格由 0 ~ n 的 int 值来表示

LiteflowExecutorInit

执行器初始化类,主要用于在启动时执行执行器的初始化方法,避免在运行执行器时第一次初始化而耗费时间

该类实例化的意义就是控制上面对象 FlowExecutor 的初始化操作,如果不在配置中设置启动初始化,则 FlowExecutor 的初始化工作会放在调用时进行 @ConditionalOnProperty(prefix = "liteflow", name = "parse-on-start", havingValue = "true")

构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class LiteflowExecutorInit implements InitializingBean {

private final FlowExecutor flowExecutor;

public LiteflowExecutorInit(FlowExecutor flowExecutor) {
this.flowExecutor = flowExecutor;
}

@Override
public void afterPropertiesSet() throws Exception {
flowExecutor.init(true);
}

}

该类实现了 Spring 钩子 InitializingBean,afterPropertiesSet 进行 FlowExecutor 的初始化操作

FlowExecutor 的 init

LiteflowExecutorInit 的实例化是为了 init FlowExecutor,而 FlowExecutor 的初始化主要目的是 parse 规则文件

ID 生成器

1
2
// 进行id生成器的初始化
IdGeneratorHolder.init();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public static void init() {
try {
// 单例创建
INSTANCE = new IdGeneratorHolder();
LiteflowConfig liteflowConfig = LiteflowConfigGetter.get();
// 获取配置项中的生成器类路径
String requestIdGeneratorClass = liteflowConfig.getRequestIdGeneratorClass();

RequestIdGenerator requestIdGenerator;
if (StrUtil.isBlank(requestIdGeneratorClass)) {
// 默认为 DefaultRequestIdGenerator
requestIdGenerator = new DefaultRequestIdGenerator();
}
else {
// 实例化自定义生成器
Class<RequestIdGenerator> idGenerateClass = (Class<RequestIdGenerator>) Class
.forName(requestIdGeneratorClass);
// 注册进 bean 容器
requestIdGenerator = ContextAwareHolder.loadContextAware().registerBean(idGenerateClass);
}
INSTANCE.setRequestIdGenerator(requestIdGenerator);
}
catch (Exception e) {
throw new RequestIdGeneratorException(e.getMessage());
}
}

IdGeneratorHolder 用于保存实例化后的 ID 生成器 RequestIdGenerator,是一个单例

基本流程就是获取配置项上的生成器类路径,然后通过反射进行实例化,保存到 IdGeneratorHolder 这个角色中,如果没有进行配置则使用默认实现 DefaultRequestIdGenerator

默认实现 DefaultRequestIdGenerator

1
2
3
4
5
6
public class DefaultRequestIdGenerator implements RequestIdGenerator {
@Override
public String generate() {
return IdUtil.fastSimpleUUID();
}
}

hutool 下的简单 UUID 生成

规则文件源路径

在上面流程中加载了逻辑组件,接下来就是解析规则表

规则路径为空

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
String ruleSource = liteflowConfig.getRuleSource();
if (StrUtil.isBlank(ruleSource)) {
// 查看有没有Parser的SPI实现
// 所有的Parser的SPI实现都是以custom形式放入的,且只支持xml形式
ServiceLoader<ParserClassNameSpi> loader = ServiceLoader.load(ParserClassNameSpi.class);
Iterator<ParserClassNameSpi> it = loader.iterator();
if (it.hasNext()) {
ParserClassNameSpi parserClassNameSpi = it.next();
ruleSource = "el_xml:" + parserClassNameSpi.getSpiClassName();
liteflowConfig.setRuleSource(ruleSource);
}
else {
// ruleSource为空,而且没有spi形式的扩展,那么说明真的没有ruleSource
// 这种情况有可能是基于代码动态构建的
return;
}
}

这一块逻辑是在判断当配置文件的规则源路径为空的情况,此时会使用 SPI 工具 ServiceLoader 去加载自定义实现的规则加载器去放置新的规则路径

项目文件和本地文件

1
2
3
4
5
6
7
8
9
10
// 如果有前缀的,则不需要再进行分割了,说明是一个整体
// 如果没有前缀,说明是本地文件,可能配置多个,所以需要分割
List<String> sourceRulePathList;
if (ReUtil.contains(PREFIX_FORMAT_CONFIG_REGEX, ruleSource)) {
sourceRulePathList = ListUtil.toList(ruleSource);
}
else {
String afterHandleRuleSource = ruleSource.replace(StrUtil.SPACE, StrUtil.EMPTY);
sourceRulePathList = ListUtil.toList(afterHandleRuleSource.split(",|;"));
}

LiteFlow 的特性可以支持从本地文件加载规则,所以在此处判断规则是来源于项目文件还是本地文件

根据规则前缀 String PREFIX_FORMAT_CONFIG_REGEX = "xml:|json:|yml:|el_xml:|el_json:|el_yml:" 的正则来匹配;如果是本地文件则根据 ,|; 来进行分割

将规则文件的路径都放置在 sourceRulePathList 这个集合中

规则文件解析

获取规则源后要对规则文件进行解析

复合规则配置

这里应该是实现不同格式规则加载的特性,需要进行相关配置 supportMultipleType

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
FlowParser parser = null;
Set<String> parserNameSet = new HashSet<>();
List<String> rulePathList = new ArrayList<>();
for (String path : sourceRulePathList) {
try {
// 查找对应的解析器
parser = FlowParserProvider.lookup(path);
parserNameSet.add(parser.getClass().getName());
// 替换掉前缀标识(如:xml:/json:),保留剩下的完整地址
path = ReUtil.replaceAll(path, PREFIX_FORMAT_CONFIG_REGEX, "");
rulePathList.add(path);

// 支持多类型的配置文件,分别解析
if (BooleanUtil.isTrue(liteflowConfig.isSupportMultipleType())) {
// 解析文件
parser.parseMain(ListUtil.toList(path));
}
}
catch (CyclicDependencyException e) {
LOG.error(e.getMessage());
throw e;
}
catch (Exception e) {
String errorMsg = StrUtil.format("init flow executor cause error for path {},reason:{}", path,
e.getMessage());
LOG.error(e.getMessage(), e);
throw new FlowExecutorNotInitException(errorMsg);
}
}

流程如下:

  1. 遍历规则路径,查找对应的解析器
  2. 记录解析器类名(主要是用于后续非多类型解析器判断使用)
  3. 路径放入规则路径集合
  4. 如果支持多类型配置文件解析,则直接进行解析 parser.parseMain(ListUtil.toList(path))
  5. catch 异常,打印日志

单类型配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// 单类型的配置文件,需要一起解析
if (BooleanUtil.isFalse(liteflowConfig.isSupportMultipleType())) {
// 检查Parser是否只有一个,因为多个不同的parser会造成子流程的混乱
if (parserNameSet.size() > 1) {
String errorMsg = "cannot have multiple different parsers";
LOG.error(errorMsg);
throw new MultipleParsersException(errorMsg);
}

// 进行多个配置文件的一起解析
try {
if (parser != null) {
// 解析文件
parser.parseMain(rulePathList);
}
else {
throw new ConfigErrorException("parse error, please check liteflow config property");
}
}
catch (CyclicDependencyException e) {
LOG.error(e.getMessage(), e);
LOG.error(e.getMessage());
throw e;
}
catch (ChainDuplicateException e) {
LOG.error(e.getMessage(), e);
throw e;
}
catch (Exception e) {
String errorMsg = StrUtil.format("init flow executor cause error for path {},reason: {}", rulePathList,
e.getMessage());
LOG.error(e.getMessage(), e);
throw new FlowExecutorNotInitException(errorMsg);
}
}

单类型的流程主要针对解析器数量进行了判断,也是使用 parser.parseMain(ListUtil.toList(path)) 进入解析流程

解析器

这里主要先来看 LocalXmlFlowELParser 的实现

1
2
3
4
5
6
7
8
9
public class LocalXmlFlowELParser extends XmlFlowELParser {

@Override
public void parseMain(List<String> pathList) throws Exception {
List<String> contentList = PathContentParserHolder.loadContextAware().parseContent(pathList);
parse(contentList);
}

}

PathContentParserHolder.loadContextAware().parseContent(pathList) 的操作用于获取文件中的内容,即规则的 EL 表达式等(就是读文件,不过会根据不同环境选择不同的实现)

最后执行 BaseXmlFlowParser 抽象类提供的 parse 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Override
public void parse(List<String> contentList) throws Exception {
if (CollectionUtil.isEmpty(contentList)) {
return;
}
List<Document> documentList = ListUtil.toList();
for (String content : contentList) {
// 解析 XML org.dom4j.DocumentHelper,转换为 Document 对象
Document document = DocumentHelper.parseText(content);
documentList.add(document);
}
// 解析 node
ParserHelper.parseNodeDocument(documentList);
// 解析 chain
ParserHelper.parseChainDocument(documentList, CHAIN_NAME_SET, this::parseOneChain);
}

ParserHelper

XML 格式的规则表达为例

解析 node

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/**
* xml 形式的主要解析过程
* @param documentList documentList
*/
public static void parseNodeDocument(List<Document> documentList) {
for (Document document : documentList) {
// 获取 root 标签
Element rootElement = document.getRootElement();
// 获取 nodes 标签
Element nodesElement = rootElement.element(NODES);
// 当存在<nodes>节点定义时,解析node节点
if (ObjectUtil.isNotNull(nodesElement)) {
List<Element> nodeList = nodesElement.elements(NODE);
String id, name, clazz, type, script, file, language;
for (Element e : nodeList) {
id = e.attributeValue(ID);
name = e.attributeValue(NAME);
clazz = e.attributeValue(_CLASS);
type = e.attributeValue(TYPE);
script = e.getText();
file = e.attributeValue(FILE);
language = e.attributeValue(LANGUAGE);

// 构建 node
NodePropBean nodePropBean = new NodePropBean().setId(id)
.setName(name)
.setClazz(clazz)
.setScript(script)
.setType(type)
.setFile(file)
.setLanguage(language);

ParserHelper.buildNode(nodePropBean);
}
}
}
}
  1. 从配置的 XML 规则中解析标签
  2. 找到 nodes 的标签;name 常量定义在 ChainConstant.NODES
  3. 遍历 nodes 下面的每一个 node 标签,构建 NodePropBean node 对象
  4. ParserHelper.buildNode(nodePropBean) 开始解析节点,解析完成后会放置在 FlowBus 中(解析过程中还在分辨是普通节点还是脚本节点,这个放到脚本相关源码中再看)

解析 chain

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public static void parseChainDocument(List<Document> documentList, Set<String> chainNameSet,
Consumer<Element> parseOneChainConsumer) {
// 先在元数据里放上chain
// 先放有一个好处,可以在parse的时候先映射到FlowBus的chainMap,然后再去解析
// 这样就不用去像之前的版本那样回归调用
// 同时也解决了不能循环依赖的问题
documentList.forEach(document -> {
// 解析chain节点
List<Element> chainList = document.getRootElement().elements(CHAIN);

// 先在元数据里放上chain
chainList.forEach(e -> {
// 校验加载的 chainName 是否有重复的
// TODO 这里是否有个问题,当混合格式加载的时候,2个同名的Chain在不同的文件里,就不行了
String chainName = Optional.ofNullable(e.attributeValue(ID)).orElse(e.attributeValue(NAME));
if (!chainNameSet.add(chainName)) {
throw new ChainDuplicateException(String.format("[chain name duplicate] chainName=%s", chainName));
}

FlowBus.addChain(chainName);
});
});
// 清空
chainNameSet.clear();

// 解析每一个chain
for (Document document : documentList) {
Element rootElement = document.getRootElement();
List<Element> chainList = rootElement.elements(CHAIN);
chainList.forEach(parseOneChainConsumer);
}
}
  1. XML 规则中解析 chain 标签
  2. 遍历标签处理 chain 数据
  3. 获取 chainName,规则为如果没有配置独立的 id 就使用 name
  4. 加入 chainNameSet,如果 name 重复则抛出异常(这种方式无法处理多文件的重复问题,我理解带来的影响就是会导致解析的 chain 被覆盖)
  5. 先将 name 加入元数据管理类 FlowBus;这个流程是预装载,其元数据管理中的 map value 是占位对象
  6. 遍历 chain 标签,调用传入的 Consumer 方法开始解析;Consumer 方法为 Parser 实现的 parseOneChain 方法

EL 规则组装器

1
2
3
4
5
6
7
public static void parseOneChainEl(JsonNode chainNode) {
// 构建chainBuilder
String chainId = Optional.ofNullable(chainNode.get(ID)).orElse(chainNode.get(NAME)).textValue();
String el = chainNode.get(VALUE).textValue();
LiteFlowChainELBuilder chainELBuilder = LiteFlowChainELBuilder.createChain().setChainId(chainId);
chainELBuilder.setEL(el).build();
}

调用 parseOneChainEl 后主要是获取 el 规则,创建 LiteFlowChainELBuilder 对象后在 setEL 的步骤中会进行 EL 规则的解析,最终返回 List<Condition> conditionList

规则的解析使用了阿里开源的 QLExpress,后面会单独一篇来讲解

基本上到这里,相关的执行组件就已经初始化完毕了

后续具体的执行就是获取 Chain 对象的 conditionList,遍历 list 进行执行

MonitorBus

监控类元数据,打印执行器类

需要配置 condition 参数 monitor.enable-log

构造方法

1
2
3
4
5
6
7
8
9
10
11
// 定时线程池
private final ScheduledExecutorService printLogScheduler = Executors.newScheduledThreadPool(1);

public MonitorBus(LiteflowConfig liteflowConfig) {
this.liteflowConfig = liteflowConfig;

if (BooleanUtil.isTrue(liteflowConfig.getEnableLog())) {
this.printLogScheduler.scheduleAtFixedRate(new MonitorTimeTask(this), liteflowConfig.getDelay(),
liteflowConfig.getPeriod(), TimeUnit.MILLISECONDS);
}
}

向定时线程池设置任务,任务为 new MonitorTimeTask(this),是一个 TimerTask

1
2
3
4
5
6
7
8
9
10
11
12
13
public class MonitorTimeTask extends TimerTask {

private MonitorBus monitorBus;

public MonitorTimeTask(MonitorBus monitorBus) {
this.monitorBus = monitorBus;
}

@Override
public void run() {
monitorBus.printStatistics();
}
}

run 方法为调用 MonitorBus.printStatistics

打印统计数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
private final ConcurrentHashMap<String, BoundedPriorityBlockingQueue<CompStatistics>> statisticsMap = new ConcurrentHashMap<>();

public void printStatistics() {
try {
Map<String, BigDecimal> compAverageTimeSpent = new HashMap<String, BigDecimal>();

for (Entry<String, BoundedPriorityBlockingQueue<CompStatistics>> entry : statisticsMap.entrySet()) {
long totalTimeSpent = 0;
for (CompStatistics statistics : entry.getValue()) {
totalTimeSpent += statistics.getTimeSpent();
}
compAverageTimeSpent.put(entry.getKey(), new BigDecimal(totalTimeSpent)
.divide(new BigDecimal(entry.getValue().size()), 2, RoundingMode.HALF_UP));
}

List<Entry<String, BigDecimal>> compAverageTimeSpentEntryList = new ArrayList<>(
compAverageTimeSpent.entrySet());

Collections.sort(compAverageTimeSpentEntryList, (o1, o2) -> o2.getValue().compareTo(o1.getValue()));

StringBuilder logStr = new StringBuilder();
logStr.append("以下为LiteFlow中间件统计信息:\n");
logStr.append("======================================================================================\n");
logStr.append("===================================SLOT INFO==========================================\n");
logStr.append(MessageFormat.format("SLOT TOTAL SIZE : {0}\n", liteflowConfig.getSlotSize()));
logStr.append(MessageFormat.format("SLOT OCCUPY COUNT : {0}\n", DataBus.OCCUPY_COUNT));
logStr.append("===============================TIME AVERAGE SPENT=====================================\n");
for (Entry<String, BigDecimal> entry : compAverageTimeSpentEntryList) {
logStr.append(MessageFormat.format("COMPONENT[{0}] AVERAGE TIME SPENT : {1}\n", entry.getKey(),
entry.getValue()));
}
logStr.append("======================================================================================\n");
LOG.info(logStr.toString());
}
catch (Exception e) {
LOG.error("print statistics cause error", e);
}
}

statisticsMap 是用来保存组件执行时上报的数据,在 printStatistics 方法中对数据进行整理和打印

主要数据:

  • slot 槽位现在的大小(用来衡量最大并发度)
  • slot 当前的占用数量(展示当前并发度)
  • 各个组件的平均耗时