管道模式 - Pipeline

背景

管道模式(Pipeline Pattern)是责任链模式(Chain of Command Pattern)的变体;区别在于责任链是将处理器按照链条组织起来,待处理的上下文按照链条找到能够处理自己的处理器,一般只有唯一的一个处理器;而管道模式是链条中的每一个处理器都需要对上下文进行处理

目的 降低业务逻辑流程的耦合性,将整个过程中所有的处理器隔离开,更方便扩展流程上新的业务逻辑

现实世界类比 工厂的生产流水线,车架 -> 发送机 -> 外壳 -> 内饰 -> 总装 -> 质检,整辆车在传送带又一个环节运输至另一个环节,每个处理环节都对汽车处理自己的部分,最终产出成品

实践

模拟一个场景,比如订单服务内查询接口把最终订单实体组装为一个 VO

除了原有的实体属性,比如内部数据需要对优惠金额进行计算,外部数据需要补充用户信息、商品信息等

问题

如果将所有步骤抽为方法,则代码一般会实现为这样

1
2
3
4
5
6
7
public OrderInfoVo buildVO(OrderBo bo) {
final OrderInfoVo vo = this.wrapVo(bo);
this.calMitigateSum(vo);
this.buildUserInfo(vo);
this.buildCommodityInfo(vo);
return vo;
}

后面如果对 VO 的字段有了新的要求,比如增加物流信息;或者对生成 VO 的流程有了新的要求,比如根据查询条件进行缓存,就会有以下坏处:

  • buildVO 中实现缓存逻辑,违反了方法的单一职责,维护在外面可能又需要维护一个统一的入口
  • 将来每加入一个新的处理步骤或者删除某个步骤,都要修改 buildVO 方法

这种情况下可以使用管道模式解决以上缺点

实现

上下文

上下文维护着整个业务链条中间的结果和最终的结果数据

管道上下文父类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* 传递到管道的上下文
*/
@Getter
@Setter
public class PipelineContext {

/**
* 处理开始时间
*/
private LocalDateTime startTime;

/**
* 处理结束时间
*/
private LocalDateTime endTime;

/**
* 获取数据名称
*/
public String getName() {
return this.getClass().getSimpleName();
}
}

具体的业务数据上下文

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* 业务上下文
*/
@Getter
@Setter
public class OrderVOContext extends PipelineContext {

/**
* 订单业务模型
*/
private OrderBO bo;

/**
* 订单 VO
*/
private OrderVO vo;

/**
* 模型创建出错时的错误信息
*/
private String errorMsg;

// 其他参数

@Override
public String getName() {
return "build OrderVO";
}
}

处理器

处理器父类

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* 管道中的上下文处理器
*/
public interface ContextHandler<T extends PipelineContext> {

/**
* 处理输入的上下文数据
*
* @param context 处理时的上下文数据
* @return 返回 true 则表示由下一个 ContextHandler 继续处理,返回 false 则表示处理结束
*/
boolean handle(T context);
}

计算优惠金额

1
2
3
4
5
6
7
8
9
10
11
12
@Component
@sl4j
public class CalMitigateSumHandler implements ContextHandler<OrderVOContext> {
@Override
public boolean handle(InstanceBuildContext context) {
log.info("--计算优惠金额--");
// 计算流程
double mitigateSum = 0.0;
context.getVo().setMitigateSum(mitigateSum);
return true;
}
}

构造用户信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Component
@sl4j
public class buildUserInfoHandler implements ContextHandler<OrderVOContext> {
@Override
public boolean handle(InstanceBuildContext context) {
log.info("--构造用户信息--");
User user = userClient.getUserById(context.getBo().getUserId());
if (Objects.isNull(user)) {
context.setErrorMsg("查询用户信息为空,id=" + context.getBo().getUserId());
return false;
}
context.getVo().setUserName(user.getName());
context.getVo().setUserAge(user.getAge());
return true;
}
}

构造商品信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Component
@sl4j
public class buildGoodsInfoHandler implements ContextHandler<OrderVOContext> {
@Override
public boolean handle(InstanceBuildContext context) {
log.info("--构造商品信息--");
Goods goods = goodsClient.getGoodsBySku(context.getBo().getGoodsSku());
if (Objects.isNull(goods)) {
context.setErrorMsg("查询商品信息为空,id=" + context.getBo().getGoodsSku());
return false;
}
context.getVo().setGoodsSku(goods.getSku());
context.getVo().setGoodsName(goods.getName());
context.getVo().setGoodsPrice(goods.getPrice());
return true;
}
}

组织处理器

现在业务 Context 和 Handler 都定义好了,那么使用什么方式将 Handler 组织起来呢?有如下几种:

  • Handler 对象持有下一个 Handler 的信息,比如使用 nextHandler 属性来保存下一个处理器;缺点是无法直观了解整个业务链条都有哪些处理器,并且增删处理器需要修改其他处理器的属性
  • 自定义注解将顺序信息和 Handler 的实现绑定;这样也无法直观了解到一段业务都有多少个处理器
  • 维护一个路由表,基于 Spring 的自动注入来实现和管理路由表,使用一个执行器角色只有处理器集合作为入口

构造路由表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
/**
* 管道路由的配置
*/
@Configuration
public class PipelineRouteConfig implements ApplicationContextAware {

/**
* 数据类型->管道中处理器类型列表 的路由
*/
private static final
Map<Class<? extends PipelineContext>,
List<Class<? extends ContextHandler<? extends PipelineContext>>>> PIPELINE_ROUTE_MAP = new HashMap<>(4);

/*
* 在这里配置各种上下文类型对应的处理管道:键为上下文类型,值为处理器类型的列表
*/
static {
PIPELINE_ROUTE_MAP.put(OrderVOContext.class,
Arrays.asList(
CalMitigateSumHandler.class,
buildUserInfoHandler.class,
buildGoodsInfoHandler.class
));

// 将来其他 Context 的管道配置
}

/**
* 在 Spring 启动时,根据路由表生成对应的管道映射关系
*/
@Bean("pipelineRouteMap")
public Map<Class<? extends PipelineContext>, List<? extends ContextHandler<? extends PipelineContext>>> getHandlerPipelineMap() {
return PIPELINE_ROUTE_MAP.entrySet()
.stream()
.collect(Collectors.toMap(Map.Entry::getKey, this::toPipeline));
}

/**
* 根据给定的管道中 ContextHandler 的类型的列表,构建管道
*/
private List<? extends ContextHandler<? extends PipelineContext>> toPipeline(
Map.Entry<Class<? extends PipelineContext>, List<Class<? extends ContextHandler<? extends PipelineContext>>>> entry) {
return entry.getValue()
.stream()
.map(appContext::getBean)
.collect(Collectors.toList());
}

private ApplicationContext appContext;

@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
appContext = applicationContext;
}
}

管道执行器

管道执行器根据传入的上下文数据的类型,找到其对应的管道,然后将上下文数据放入管道中去进行处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
/**
* 管道执行器
*/
@Component
public class PipelineExecutor {

private final Logger logger = LoggerFactory.getLogger(this.getClass());

/**
* 引用 PipelineRouteConfig 中的 pipelineRouteMap
*/
@Resource
private Map<Class<? extends PipelineContext>,
List<? extends ContextHandler<? super PipelineContext>>> pipelineRouteMap;

/**
* 同步处理输入的上下文数据
* 如果处理时上下文数据流通到最后一个处理器且最后一个处理器返回 true,则返回 true,否则返回 false
*
* @param context 输入的上下文数据
* @return 处理过程中管道是否畅通,畅通返回 true,不畅通返回 false
*/
public boolean acceptSync(PipelineContext context) {
Objects.requireNonNull(context, "上下文数据不能为 null");
// 拿到数据类型
Class<? extends PipelineContext> dataType = context.getClass();
// 获取数据处理管道
List<? extends ContextHandler<? super PipelineContext>> pipeline = pipelineRouteMap.get(dataType);

if (CollectionUtils.isEmpty(pipeline)) {
logger.error("{} 的管道为空", dataType.getSimpleName());
return false;
}

// 管道是否畅通
boolean lastSuccess = true;

for (ContextHandler<? super PipelineContext> handler : pipeline) {
try {
// 当前处理器处理数据,并返回是否继续向下处理
lastSuccess = handler.handle(context);
} catch (Throwable ex) {
lastSuccess = false;
logger.error("[{}] 处理异常,handler={}", context.getName(), handler.getClass().getSimpleName(), ex);
}

// 不再向下处理
if (!lastSuccess) { break; }
}

return lastSuccess;
}
}

使用管道

原来的 buildVO 可以引入管道来进行实现了

1
2
3
4
5
6
7
8
9
10
11
12
13
public OrderInfoVo buildVO(OrderBo bo) {
OrderVOContext context = this.createContext(bo);
// 执行
boolean success = pipelineExecutor.acceptSync(data);

// 流程成功
if (success) {
return CommonResponse.success(data.getInstanceId());
}

logger.error("build vo 失败:{}", data.getErrorMsg());
return CommonResponse.failed(data.getErrorMsg());
}

增加处理器

如果需要在流程中增加新业务

创建新的处理器实现类

1
2
3
4
5
6
7
8
9
@Component
@sl4j
public class newTaskHandler implements ContextHandler<OrderVOContext> {
@Override
public boolean handle(InstanceBuildContext context) {
log.info("--执行新业务--");
return true;
}
}

修改手动维护的静态路由表

1
2
3
4
5
6
7
8
static {
PIPELINE_ROUTE_MAP.put(OrderVOContext.class,
Arrays.asList(
CalMitigateSumHandler.class,
buildUserInfoHandler.class,
newTaskHandler.class, // 流程中增加了新逻辑
buildGoodsInfoHandler.class
));

异步处理

对于步骤繁多的任务,很多时候我们更需要的是异步处理,比如某些耗时长的定时任务,管道处理异步化非常的简单

在 PipelineExecutor 中引入异步的处理方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* 管道线程池
*/
@Resource
private ThreadPoolTaskExecutor pipelineThreadPool; // 处理异步的线程池

/**
* 异步处理输入的上下文数据
*
* @param context 上下文数据
* @param callback 处理完成的回调
*/
public void acceptAsync(PipelineContext context, BiConsumer<PipelineContext, Boolean> callback) {
pipelineThreadPool.execute(() -> {
boolean success = acceptSync(context);

if (callback != null) {
callback.accept(context, success);
}
});
}

总结

通过管道模式,可以大幅降低了系统的耦合度和提升了内聚程度与扩展性:

  • 优点

    • buildVO 所在的类只负责处理引导 BO 进入管道的业务流程,而不关注具体业务逻
    • PipelineExecutor 抽象定义,不关心业务细节
    • 每个 ContextHandler 只负责自己的业务,不需要知道链路结构,与其他处理器解耦
    • 对于处理器新增、删除、调整顺序等操作,只需要修改路由表,和业务逻辑脱离
  • 缺点

    • 大的 Context 对象,让业务数据粒度很粗(上帝对象)
    • 和策略一样,可能造成实现类膨胀
    • 错误信息需要保存在 Context 对象中,走完所有处理器才会将错误返回

参考

设计模式最佳套路2 —— 基于 Spring 实现管道模式的最佳实践 - 知乎 (zhihu.com)

责任链设计模式(职责链模式) (refactoringguru.cn)