背景
管道模式(Pipeline Pattern)是责任链模式(Chain of Command
Pattern)的变体;区别在于责任链是将处理器按照链条组织起来,待处理的上下文按照链条找到能够处理自己的处理器,一般只有唯一的一个处理器;而管道模式是链条中的每一个处理器都需要对上下文进行处理
目的
降低业务逻辑流程的耦合性,将整个过程中所有的处理器隔离开,更方便扩展流程上新的业务逻辑
现实世界类比 工厂的生产流水线,车架 -> 发送机
-> 外壳 -> 内饰 -> 总装 ->
质检,整辆车在传送带又一个环节运输至另一个环节,每个处理环节都对汽车处理自己的部分,最终产出成品
实践
模拟一个场景,比如订单服务内查询接口把最终订单实体组装为一个 VO
除了原有的实体属性,比如内部数据需要对优惠金额进行计算,外部数据需要补充用户信息、商品信息等
问题
如果将所有步骤抽为方法,则代码一般会实现为这样
1 2 3 4 5 6 7
| public OrderInfoVo buildVO(OrderBo bo) { final OrderInfoVo vo = this.wrapVo(bo); this.calMitigateSum(vo); this.buildUserInfo(vo); this.buildCommodityInfo(vo); return vo; }
|
后面如果对 VO 的字段有了新的要求,比如增加物流信息;或者对生成 VO
的流程有了新的要求,比如根据查询条件进行缓存,就会有以下坏处:
- 在
buildVO
中实现缓存逻辑,违反了方法的单一职责,维护在外面可能又需要维护一个统一的入口
- 将来每加入一个新的处理步骤或者删除某个步骤,都要修改
buildVO
方法
这种情况下可以使用管道模式解决以上缺点
实现
上下文
上下文维护着整个业务链条中间的结果和最终的结果数据
管道上下文父类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
|
@Getter @Setter public class PipelineContext {
private LocalDateTime startTime;
private LocalDateTime endTime;
public String getName() { return this.getClass().getSimpleName(); } }
|
具体的业务数据上下文
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
|
@Getter @Setter public class OrderVOContext extends PipelineContext {
private OrderBO bo;
private OrderVO vo;
private String errorMsg;
@Override public String getName() { return "build OrderVO"; } }
|
处理器
处理器父类
1 2 3 4 5 6 7 8 9 10 11 12 13
|
public interface ContextHandler<T extends PipelineContext> {
boolean handle(T context); }
|
计算优惠金额
1 2 3 4 5 6 7 8 9 10 11 12
| @Component @sl4j public class CalMitigateSumHandler implements ContextHandler<OrderVOContext> { @Override public boolean handle(InstanceBuildContext context) { log.info("--计算优惠金额--"); double mitigateSum = 0.0; context.getVo().setMitigateSum(mitigateSum); return true; } }
|
构造用户信息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| @Component @sl4j public class buildUserInfoHandler implements ContextHandler<OrderVOContext> { @Override public boolean handle(InstanceBuildContext context) { log.info("--构造用户信息--"); User user = userClient.getUserById(context.getBo().getUserId()); if (Objects.isNull(user)) { context.setErrorMsg("查询用户信息为空,id=" + context.getBo().getUserId()); return false; } context.getVo().setUserName(user.getName()); context.getVo().setUserAge(user.getAge()); return true; } }
|
构造商品信息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| @Component @sl4j public class buildGoodsInfoHandler implements ContextHandler<OrderVOContext> { @Override public boolean handle(InstanceBuildContext context) { log.info("--构造商品信息--"); Goods goods = goodsClient.getGoodsBySku(context.getBo().getGoodsSku()); if (Objects.isNull(goods)) { context.setErrorMsg("查询商品信息为空,id=" + context.getBo().getGoodsSku()); return false; } context.getVo().setGoodsSku(goods.getSku()); context.getVo().setGoodsName(goods.getName()); context.getVo().setGoodsPrice(goods.getPrice()); return true; } }
|
组织处理器
现在业务 Context 和 Handler 都定义好了,那么使用什么方式将 Handler
组织起来呢?有如下几种:
- Handler 对象持有下一个 Handler 的信息,比如使用
nextHandler
属性来保存下一个处理器;缺点是无法直观了解整个业务链条都有哪些处理器,并且增删处理器需要修改其他处理器的属性
- 自定义注解将顺序信息和 Handler
的实现绑定;这样也无法直观了解到一段业务都有多少个处理器
- 维护一个路由表,基于 Spring
的自动注入来实现和管理路由表,使用一个执行器角色只有处理器集合作为入口
构造路由表
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
|
@Configuration public class PipelineRouteConfig implements ApplicationContextAware {
private static final Map<Class<? extends PipelineContext>, List<Class<? extends ContextHandler<? extends PipelineContext>>>> PIPELINE_ROUTE_MAP = new HashMap<>(4);
static { PIPELINE_ROUTE_MAP.put(OrderVOContext.class, Arrays.asList( CalMitigateSumHandler.class, buildUserInfoHandler.class, buildGoodsInfoHandler.class ));
}
@Bean("pipelineRouteMap") public Map<Class<? extends PipelineContext>, List<? extends ContextHandler<? extends PipelineContext>>> getHandlerPipelineMap() { return PIPELINE_ROUTE_MAP.entrySet() .stream() .collect(Collectors.toMap(Map.Entry::getKey, this::toPipeline)); }
private List<? extends ContextHandler<? extends PipelineContext>> toPipeline( Map.Entry<Class<? extends PipelineContext>, List<Class<? extends ContextHandler<? extends PipelineContext>>>> entry) { return entry.getValue() .stream() .map(appContext::getBean) .collect(Collectors.toList()); }
private ApplicationContext appContext;
@Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { appContext = applicationContext; } }
|
管道执行器
管道执行器根据传入的上下文数据的类型,找到其对应的管道,然后将上下文数据放入管道中去进行处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
|
@Component public class PipelineExecutor {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Resource private Map<Class<? extends PipelineContext>, List<? extends ContextHandler<? super PipelineContext>>> pipelineRouteMap;
public boolean acceptSync(PipelineContext context) { Objects.requireNonNull(context, "上下文数据不能为 null"); Class<? extends PipelineContext> dataType = context.getClass(); List<? extends ContextHandler<? super PipelineContext>> pipeline = pipelineRouteMap.get(dataType);
if (CollectionUtils.isEmpty(pipeline)) { logger.error("{} 的管道为空", dataType.getSimpleName()); return false; }
boolean lastSuccess = true;
for (ContextHandler<? super PipelineContext> handler : pipeline) { try { lastSuccess = handler.handle(context); } catch (Throwable ex) { lastSuccess = false; logger.error("[{}] 处理异常,handler={}", context.getName(), handler.getClass().getSimpleName(), ex); }
if (!lastSuccess) { break; } }
return lastSuccess; } }
|
使用管道
原来的 buildVO
可以引入管道来进行实现了
1 2 3 4 5 6 7 8 9 10 11 12 13
| public OrderInfoVo buildVO(OrderBo bo) { OrderVOContext context = this.createContext(bo); boolean success = pipelineExecutor.acceptSync(data);
if (success) { return CommonResponse.success(data.getInstanceId()); }
logger.error("build vo 失败:{}", data.getErrorMsg()); return CommonResponse.failed(data.getErrorMsg()); }
|
增加处理器
如果需要在流程中增加新业务
创建新的处理器实现类
1 2 3 4 5 6 7 8 9
| @Component @sl4j public class newTaskHandler implements ContextHandler<OrderVOContext> { @Override public boolean handle(InstanceBuildContext context) { log.info("--执行新业务--"); return true; } }
|
修改手动维护的静态路由表
1 2 3 4 5 6 7 8
| static { PIPELINE_ROUTE_MAP.put(OrderVOContext.class, Arrays.asList( CalMitigateSumHandler.class, buildUserInfoHandler.class, newTaskHandler.class, buildGoodsInfoHandler.class ));
|
异步处理
对于步骤繁多的任务,很多时候我们更需要的是异步处理,比如某些耗时长的定时任务,管道处理异步化非常的简单
在 PipelineExecutor 中引入异步的处理方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
|
@Resource private ThreadPoolTaskExecutor pipelineThreadPool;
public void acceptAsync(PipelineContext context, BiConsumer<PipelineContext, Boolean> callback) { pipelineThreadPool.execute(() -> { boolean success = acceptSync(context);
if (callback != null) { callback.accept(context, success); } }); }
|
总结
通过管道模式,可以大幅降低了系统的耦合度和提升了内聚程度与扩展性:
优点
buildVO
所在的类只负责处理引导 BO
进入管道的业务流程,而不关注具体业务逻
PipelineExecutor
抽象定义,不关心业务细节
- 每个
ContextHandler
只负责自己的业务,不需要知道链路结构,与其他处理器解耦
- 对于处理器新增、删除、调整顺序等操作,只需要修改路由表,和业务逻辑脱离
缺点
- 大的 Context 对象,让业务数据粒度很粗(上帝对象)
- 和策略一样,可能造成实现类膨胀
- 错误信息需要保存在 Context 对象中,走完所有处理器才会将错误返回
参考
设计模式最佳套路2 ——
基于 Spring 实现管道模式的最佳实践 - 知乎 (zhihu.com)
责任链设计模式(职责链模式)
(refactoringguru.cn)