1. 为什么"内存Join"是个无法绕过的话题
首先,我们先简单解释下,什么是“内存Join”。
相信大家对关系数据库的 join 语句肯定不陌生,其作用就是通过关联关系从多个表中查询数据,关联条件和数据聚合全部由 数据库服务完成。
而 内存 Join,简单来说就是把原本数据库帮我们完成的数据聚合操作迁移到应用服务,在应用服务的内存中完成。
数据库join非常简单,但随着系统的发展,内存join变得越来越重要,其核心驱动力有:
- 微服务。微服务要求“数据资产私有化”,也就是说每个服务的数据库是私有资产,不允许其他服务的直接访问。如果需要访问,只能通过服务所提供的接口完成
- 分库分表的限制。当数据量超过 MySQL 单实例承载能力时,通常会通过“分库分表”这一技术手段来解决,分库分表后,数据被分散到多个分区中,导致 join 语句失效
- 性能瓶颈。在高并发情况下,join 存在一定的性能问题,高并发、高性能端场景不适合使用。很多公司规范中对 join 的使用做出了明确的限制
2. 课程先导
发现变化,封装变化,管理变化,是开发人员的必备技能。
本篇文章从查询订单这个业务场景为入口,针对数据的内存join进行多次抽象和封装,最终实现“内存Join声明化”。
首先,先看下最终的效果,从直观上感受下“抽象”带来的效率提升。
通过抽象,可以达到如下效果:
- 左边一坨“模板代码” 等价于右边一个注解
- 模型需要绑定 UserVO 数据,只需使用 @JoinUserVOOnId 注解进行声明配置即可
- @JoinInMemoryConfig 注解的 PARALLEL 配置将开启多线程并行处理,以提供性能
神秘背后的本质便是“抽象”。让我们以订单查询为线索,层层递进,最终实现“能力声明化”。
能力声明化,是抽象的一种高级表现,无需编写代码,通过配置的方式为特定组件进行能力加强。
在正式开始之前,可以先了解下整体的推演流程:
3.【案例分析】订单查询
假设,我们是订单中心的一位研发伙伴,需要开发 “我的订单” 模块,其核心接口包括:
- 我的订单,查询用户的全部订单,包括 订单信息、用户信息、邮寄地址信息、商品信息等;
- 订单详情,查询某个订单的详细信息,包括 订单信息、用户信息、邮寄地址信息、商品信息、支付信息等;
根据需求定义 OrderService 接口如下:
public interface OrderService { // 我的订单 ListgetByUserId(Long userId); // 订单详情 OrderDetailVO getDetailByOrderId(Long orderId); } // 为配合多种实现策略,使用抽象类进行统一 public abstract class OrderListVO { public abstract OrderVO getOrder(); public abstract UserVO getUser(); public abstract AddressVO getAddress(); public abstract ProductVO getProduct(); } // 为配合多种实现策略,使用抽象类进行统一 public abstract class OrderDetailVO { public abstract OrderVO getOrder(); public abstract UserVO getUser(); public abstract AddressVO getAddress(); public abstract ProductVO getProduct(); public abstract List getPayInfo(); }
3.1. Foreach + 单条抓取方案
这么简单的需求,那不是信手拈来,很快就提供了一版
代码具体如下:
@Service public class OrderServiceCodingV1 implements OrderService { @Autowired private OrderRepository orderRepository; @Autowired private AddressRepository addressRepository; @Autowired private ProductRepository productRepository; @Autowired private UserRepository userRepository; @Autowired private PayInfoRepository payInfoRepository; @Override public ListgetByUserId(Long userId) { // 获取用户订单 List orders = this.orderRepository.getByUserId(userId); // 依次进行数据绑定 return orders.stream() .map(order -> convertToOrderListVO(order)) .collect(toList()); } private OrderListVOCodingV1 convertToOrderListVO(Order order) { OrderVO orderVO = OrderVO.apply(order); OrderListVOCodingV1 orderDetailVO = new OrderListVOCodingV1(orderVO); // 绑定地址信息 Address address = this.addressRepository.getById(order.getAddressId()); AddressVO addressVO = AddressVO.apply(address); orderDetailVO.setAddress(addressVO); // 绑定用户信息 User user = this.userRepository.getById(order.getUserId()); UserVO userVO = UserVO.apply(user); orderDetailVO.setUser(userVO); // 绑定商品信息 Product product = this.productRepository.getById(order.getProductId()); ProductVO productVO = ProductVO.apply(product); orderDetailVO.setProduct(productVO); return orderDetailVO; } @Override public OrderDetailVO getDetailByOrderId(Long orderId) { // 暂时忽略 Order order = this.orderRepository.getById(orderId); return convertToOrderDetailVO(order); } private OrderDetailVO convertToOrderDetailVO(Order order) { OrderDetailVOCodingV1 orderDetail = new OrderDetailVOCodingV1(OrderVO.apply(order)); // 获取地址并进行绑定 Address address = this.addressRepository.getById(order.getAddressId()); AddressVO addressVO = AddressVO.apply(address); orderDetail.setAddress(addressVO); // 获取用户并进行绑定 User user = this.userRepository.getById(order.getUserId()); UserVO userVO = UserVO.apply(user); orderDetail.setUser(userVO); // 获取商品并进行绑定 Product product = this.productRepository.getById(order.getProductId()); ProductVO productVO = ProductVO.apply(product); orderDetail.setProduct(productVO); // 获取支付信息并进行绑定 List payInfos = this.payInfoRepository.getByOrderId(order.getId()); List payInfoVOList = payInfos.stream() .map(PayInfoVO::apply) .collect(toList()); orderDetail.setPayInfo(payInfoVOList); return orderDetail; } }
如果真的这样实现,那你离“被跑路”不远了。
为什么会这么说呢?因为 ==“我的订单”这个接口存在严重的性能问题!==
“我的订单”接口具体实现如下:
- 查询 order 信息
- 依次对其进行数据抓取
- 完成数据绑定并返回结果
单个用户请求,数据库访问总次数 = 1(获取用户订单)+ N(订单数量) * 3(需要抓取的关联数据)
其中,N(订单数量) * 3(关联数据数量) 存在性能隐患,存在严重的==读放大效应==。一旦遇到忠实用户,存在成百上千订单,除了超时别无办法。
“订单详情”接口实现,目前问题不大,最大的问题为:“订单详情”与“我的订单”两个接口存在大量的重复逻辑!
3.2. 批量查询 + 内存Join
首先,我们先来解决 “我的订单”接口的性能问题。从之前的分析可知,性能低下的根本原因在于 “读放大效应”,数据库请求次数与用户订单数成正比,为了更好的保障性能,最好将数据库操作控制在一个常量。
整体思路为:先批量获取要绑定的数据,然后遍历每一个订单,在内存中完成数据绑定。
实现代码如下:
@Service public class OrderServiceCodingV2 implements OrderService { @Autowired private OrderRepository orderRepository; @Autowired private AddressRepository addressRepository; @Autowired private ProductRepository productRepository; @Autowired private UserRepository userRepository; @Autowired private PayInfoRepository payInfoRepository; @Override public ListgetByUserId(Long userId) { List orders = this.orderRepository.getByUserId(userId); List orderDetailVOS = orders.stream() .map(order -> new OrderListVOCodingV2(OrderVO.apply(order))) .collect(toList()); // 批量获取用户,并依次进行绑定 List userIds = orders.stream() .map(Order::getUserId) .collect(toList()); List users = this.userRepository.getByIds(userIds); Map userMap = users.stream() .collect(toMap(User::getId, Function.identity(), (a, b) -> a)); for (OrderListVOCodingV2 orderDetailVO : orderDetailVOS){ User user = userMap.get(orderDetailVO.getOrder().getUserId()); UserVO userVO = UserVO.apply(user); orderDetailVO.setUser(userVO); } // 批量获取地址,并依次进行绑定 List addressIds = orders.stream() .map(Order::getAddressId) .collect(toList()); List addresses = this.addressRepository.getByIds(addressIds); Map addressMap = addresses.stream() .collect(toMap(Address::getId, Function.identity(), (a, b) -> a)); for (OrderListVOCodingV2 orderDetailVO : orderDetailVOS){ Address address = addressMap.get(orderDetailVO.getOrder().getAddressId()); AddressVO addressVO = AddressVO.apply(address); orderDetailVO.setAddress(addressVO); } // 批量获取商品,并依次进行绑定 List productIds = orders.stream() .map(Order::getProductId) .collect(toList()); List products = this.productRepository.getByIds(productIds); Map productMap = products.stream() .collect(toMap(Product::getId, Function.identity(), (a, b) -> a)); for (OrderListVOCodingV2 orderDetailVO : orderDetailVOS){ Product product = productMap.get(orderDetailVO.getOrder().getProductId()); ProductVO productVO = ProductVO.apply(product); orderDetailVO.setProduct(productVO); } return orderDetailVOS.stream() .collect(toList()); } @Override public OrderDetailVO getDetailByOrderId(Long orderId) { // 暂时忽略 Order order = this.orderRepository.getById(orderId); return convertToOrderDetailVO(order); } private OrderDetailVO convertToOrderDetailVO(Order order) { // 暂时忽略 return orderDetail; } }
调整之后,对于“我的订单”接口,单个用户请求==数据库的访问次数变成了常量(4)==。
如果你是这么实现的,那恭喜你,你已步入合格程序员行列。
3.3. 并行批量查询 + 内存Join
批量查询+内存Join 方案能满足大部分场景,如果要抓取的数据太多,也就是数据库访问这个==常量变大==时,性能也会越来越差。
原因很简单,由于串行执行,整体耗时 = 获取订单耗时 + sum(抓取数据耗时)
聪明的同学早就跃跃欲试,这个我会:多线程并行执行呗。
是的,基于 Future 的实现如下(还有很多版本,比如 CountDownLatch)
整体设计如下:
示例代码如下:
@Service public class OrderServiceCodingV3 implements OrderService { private ExecutorService executorService; @Autowired private OrderRepository orderRepository; @Autowired private AddressRepository addressRepository; @Autowired private ProductRepository productRepository; @Autowired private UserRepository userRepository; @Autowired private PayInfoRepository payInfoRepository; @PostConstruct public void init(){ // 初始化线程池(不要使用Executors,这里只是演示,需要对资源进行评估) this.executorService = Executors.newFixedThreadPool(20); } @SneakyThrows @Override public ListgetByUserId(Long userId) { List orders = this.orderRepository.getByUserId(userId); List orderDetailVOS = orders.stream() .map(order -> new OrderListVOCodingV2(OrderVO.apply(order))) .collect(toList()); List > callables = Lists.newArrayListWithCapacity(3); // 创建异步任务 callables.add(() -> { // 批量获取用户,并依次进行绑定 List userIds = orders.stream() .map(Order::getUserId) .collect(toList()); List users = this.userRepository.getByIds(userIds); Map userMap = users.stream() .collect(toMap(User::getId, Function.identity(), (a, b) -> a)); for (OrderListVOCodingV2 orderDetailVO : orderDetailVOS){ User user = userMap.get(orderDetailVO.getOrder().getUserId()); UserVO userVO = UserVO.apply(user); orderDetailVO.setUser(userVO); } return null; }); // 创建异步任务 callables.add(() ->{ // 批量获取地址,并依次进行绑定 List addressIds = orders.stream() .map(Order::getAddressId) .collect(toList()); List addresses = this.addressRepository.getByIds(addressIds); Map addressMap = addresses.stream() .collect(toMap(Address::getId, Function.identity(), (a, b) -> a)); for (OrderListVOCodingV2 orderDetailVO : orderDetailVOS){ Address address = addressMap.get(orderDetailVO.getOrder().getAddressId()); AddressVO addressVO = AddressVO.apply(address); orderDetailVO.setAddress(addressVO); } return null; }); // 创建异步任务 callables.add(() -> { // 批量获取商品,并依次进行绑定 List productIds = orders.stream() .map(Order::getProductId) .collect(toList()); List products = this.productRepository.getByIds(productIds); Map productMap = products.stream() .collect(toMap(Product::getId, Function.identity(), (a, b) -> a)); for (OrderListVOCodingV2 orderDetailVO : orderDetailVOS){ Product product = productMap.get(orderDetailVO.getOrder().getProductId()); ProductVO productVO = ProductVO.apply(product); orderDetailVO.setProduct(productVO); } return null; }); // 执行异步任务 this.executorService.invokeAll(callables); return orderDetailVOS.stream() .collect(toList()); } @Override public OrderDetailVO getDetailByOrderId(Long orderId) { // 暂时忽略 Order order = this.orderRepository.getById(orderId); return convertToOrderDetailVO(order); } private OrderDetailVO convertToOrderDetailVO(Order order) { // 暂时忽略 } }
多线程并发执行,整体耗时 = 获取订单耗时 + max(抓取数据耗时)
如果你能够这样实现的,那恭喜你,你已步入高级程序员行列。
然后呢,到此为止了?NO,接下来才是高潮!!!
让我们打开认知,开启“抽象+封装”之旅。
4. Fetcher封装
仔细研究上述代码,寻找里面的==“变与不变”==,你会发现:
- 由于“我的订单” 和 “订单详情” 返回的是不同的 VO,导致在实现绑定操作时写了两套基本一样的逻辑;
- Address、User、Product 的绑定逻辑骨架是一样的,一些细节操作存在差异;
找到逻辑中的变化点,接下来便是有针对性的进行封装。
4.1. 消除方法中的重复代码
对于 “我的订单” 和 “订单详情” 返回==不同的 VO==,该怎么处理呢?
非常简单,思路如下:
- 【不变】抽象出“行为接口” Fetcher,统一操作行为
- 【变化】基于多态,不同的 VO 派生自相同的接口,但可以自己定义实现,从而实现个性化变化
整体设计如下:
简单示例如下:
// 以 UserVO 为例,ProductVO、AddressVO,PayInfoVO 基本一致,不在赘述 public interface UserVOFetcherV1 { Long getUserId(); void setUser(UserVO user); } // OrderDetailVO 实现对应的接口,为了突出重点暂时忽略具体实现 public class OrderDetailVOFetcherV1 extends OrderDetailVO implements AddressVOFetcherV1, ProductVOFetcherV1, UserVOFetcherV1, PayInfoVOFetcherV1{ } // OrderListVO 实现对应接口,为了突出重点暂时忽略具体实现 public class OrderListVOFetcherV1 extends OrderListVO implements AddressVOFetcherV1, ProductVOFetcherV1, UserVOFetcherV1 { }
有了统一的操作接口,接下来便是抽取具体的绑定逻辑,以 UserVOFetcherExecutor 为例:
@Component public class UserVOFetcherExecutorV1 { @Autowired private UserRepository userRepository; public void fetch(List extends UserVOFetcherV1> fetchers){ Listids = fetchers.stream() .map(UserVOFetcherV1::getUserId) .distinct() .collect(Collectors.toList()); List users = userRepository.getByIds(ids); Map userMap = users.stream() .collect(toMap(user -> user.getId(), Function.identity())); fetchers.forEach(fetcher -> { Long userId = fetcher.getUserId(); User user = userMap.get(userId); if (user != null){ UserVO userVO = UserVO.apply(user); fetcher.setUser(userVO); } }); } }
实现逻辑没有变化,最重要的变化在于“入参类型”,不在是具体的 VO,而是抽象的 UserVOFetcher 接口。
AddressVOFetcherExecutor、ProductVOFetcherExecutor、PayInfoVOFetcherExecutor 与 UserVOFetcherExecutorV1 逻辑基本一致,篇幅问题不在赘述。
这样一个小小的调整,会给使用方带来什么便利?一起看下使用方的变化:
@Service public class OrderServiceFetcherV1 implements OrderService { @Autowired private OrderRepository orderRepository; @Autowired private AddressVOFetcherExecutorV1 addressVOFetcherExecutorV1; @Autowired private ProductVOFetcherExecutorV1 productVOFetcherExecutorV1; @Autowired private UserVOFetcherExecutorV1 userVOFetcherExecutorV1; @Autowired private PayInfoVOFetcherExecutorV1 payInfoVOFetcherExecutorV1; @Override public ListgetByUserId(Long userId) { List orders = this.orderRepository.getByUserId(userId); List orderDetailVOS = orders.stream() .map(order -> new OrderListVOFetcherV1(OrderVO.apply(order))) .collect(toList()); // 直接使用 FetcherExecutor 完成数据绑定 this.addressVOFetcherExecutorV1.fetch(orderDetailVOS); this.productVOFetcherExecutorV1.fetch(orderDetailVOS); this.userVOFetcherExecutorV1.fetch(orderDetailVOS); return orderDetailVOS.stream() .collect(toList()); } @Override public OrderDetailVO getDetailByOrderId(Long orderId) { Order order = this.orderRepository.getById(orderId); OrderDetailVOFetcherV1 orderDetail = new OrderDetailVOFetcherV1(OrderVO.apply(order)); List orderDetailVOS = Arrays.asList(orderDetail); // 直接使用 FetcherExecutor 完成数据绑定 this.addressVOFetcherExecutorV1.fetch(orderDetailVOS); this.productVOFetcherExecutorV1.fetch(orderDetailVOS); this.userVOFetcherExecutorV1.fetch(orderDetailVOS); this.payInfoVOFetcherExecutorV1.fetch(orderDetailVOS); return orderDetail; } }
两个方法直接使用 FetcherExecutor 完成数据抓取和绑定,实现了==绑定逻辑的复用==。
如果再有 VO 需要进行数据绑定,只需:
- VO 实现 XXXFetcher 接口,实现对应方法,提供关联数据并完成数据绑定
- 使用 XXXFetcherExecutor 完成数据绑定
至此,面对新业务基本上与“绑定逻辑”说再见了。
4.2. 重构绑定逻辑
接下来让我们一起聚焦于绑定逻辑,先对比下上述的UserVOFetcherExecutor 与下面的 AddressVOFetcherExecutor, 找到里面的变化与不变:
@Component public class AddressVOFetcherExecutorV1 { @Autowired private AddressRepository addressRepository; public void fetch(List extends AddressVOFetcherV1> fetchers){ // 获取关联信息 Listids = fetchers.stream() .map(AddressVOFetcherV1::getAddressId) .distinct() .collect(Collectors.toList()); // 查询关联数据 List addresses = addressRepository.getByIds(ids); // 转为为 Map Map addressMap = addresses.stream() .collect(toMap(address -> address.getId(), Function.identity())); // 依次进行数据绑定 fetchers.forEach(fetcher -> { Long addressId = fetcher.getAddressId(); Address address = addressMap.get(addressId); if (address != null){ // 转换为 VO AddressVO addressVO = AddressVO.apply(address); // 将数据写回到结果 fetcher.setAddress(addressVO); } }); } }
仔细观察,会发现:
【不变】逻辑骨架基本一致,基本是由:
- 获取关联信息
- 查询关联数据
- 将其转换为 Map
- 讲数据转化为 VO
- 将 VO 绑定到结果对象
【变化】实现细节存在差异;
- 从什么接口中获取关联信息
- 如何查询关联数据
- 转换为 Map 的键是什么
- 如何将数据转换为 VO
- 如何完成数据的绑定
熟悉设计模式的伙伴是否眼前一亮?停顿一下好好回想一下,哪种模式就是用来处理这种问题的?
答案便是:模板方法模式
整体思想为:
- 将不变的逻辑骨架封装在父类方法
- 将变化的实现细节放在子类中进行扩展
整体设计如下:
抽取公共父类如下:
abstract class BaseItemFetcherExecutorimplements ItemFetcherExecutor { @Override public void fetch(List fetchers) { // 获取关联信息 List ids = fetchers.stream() .map(this::getFetchId) .distinct() .collect(Collectors.toList()); // 查询关联数据 List datas = loadData(ids); // 转为为 Map Map > dataMap = datas.stream() .collect(groupingBy(this::getDataId)); // 依次进行数据绑定 fetchers.forEach(fetcher -> { Long id = getFetchId(fetcher); List ds = dataMap.get(id); if (ds != null){ // 转换为 VO List result = ds.stream() .map( data -> convertToVo(data)) .collect(Collectors.toList()); // 将数据写回到结果 setResult(fetcher, result); } }); } protected abstract Long getFetchId(FETCHER fetcher); protected abstract List loadData(List ids); protected abstract Long getDataId(DATA data); protected abstract RESULT convertToVo(DATA data); protected abstract void setResult(FETCHER fetcher, List result); }
基于 BaseItemFetcherExecutor 的 UserFetcherExecutor 如下:
@Component public class UserVOFetcherExecutorV2 extends BaseItemFetcherExecutor{ @Autowired private UserRepository userRepository; @Override protected Long getFetchId(UserVOFetcherV2 fetcher) { return fetcher.getUserId(); } @Override protected List loadData(List ids) { return this.userRepository.getByIds(ids); } @Override protected Long getDataId(User user) { return user.getId(); } @Override protected UserVO convertToVo(User user) { return UserVO.apply(user); } @Override protected void setResult(UserVOFetcherV2 fetcher, List userVO) { if (CollectionUtils.isNotEmpty(userVO)) { fetcher.setUser(userVO.get(0)); } } @Override public boolean support(Class cls) { // 暂时忽略,稍后会细讲 return UserVOFetcherV2.class.isAssignableFrom(cls); } }
UserVOFetcherExecutor究竟发生什么变化呢?好像变得更复杂了:
- 从代码量角度(行数)变得更多了,因为类函数明显变大
- 从复杂度角度(逻辑)变得更加简单,每个方法基本都是一两句语句
那我们究竟得到了什么好处?可以花几分钟好好思考一下!!!
在说结果之前,让我们看下另一个变化点。回想下 FetcherExecutor 的执行点,如下:
@Override public ListgetByUserId(Long userId) { List orders = this.orderRepository.getByUserId(userId); List orderDetailVOS = orders.stream() .map(order -> new OrderListVOFetcherV1(OrderVO.apply(order))) .collect(toList()); // 手工调用,OrderListVO 实现新接口,需要增加新的依赖和调用 this.addressVOFetcherExecutorV1.fetch(orderDetailVOS); this.productVOFetcherExecutorV1.fetch(orderDetailVOS); this.userVOFetcherExecutorV1.fetch(orderDetailVOS); return orderDetailVOS.stream() .collect(toList()); } @Override public OrderDetailVO getDetailByOrderId(Long orderId) { Order order = this.orderRepository.getById(orderId); OrderDetailVOFetcherV1 orderDetail = new OrderDetailVOFetcherV1(OrderVO.apply(order)); List orderDetailVOS = Arrays.asList(orderDetail); // 手工调用,OrderDetailVO 实现新接口,需要增加新的依赖和调用 this.addressVOFetcherExecutorV1.fetch(orderDetailVOS); this.productVOFetcherExecutorV1.fetch(orderDetailVOS); this.userVOFetcherExecutorV1.fetch(orderDetailVOS); this.payInfoVOFetcherExecutorV1.fetch(orderDetailVOS); return orderDetail; }
其实,需要调用哪些 FetcherExecutor 完全可以由 VO 实现的接口来确定。也就是说,需要绑定新数据,只需 VO 继承并实现新的 Fetcher 接口即可。
对此,我们需要:
- 一个统一的访问入口,对外提供访问
- 每个 FetcherExecutor 能够识别 VO 并执行绑定逻辑
哪个设计模式是用来解决这个问题?花几分钟好好思考一下!
答案是:责任链模型
标准的责任链模式用起来比较繁琐,在 Spring 实现中大量使用他的一种变现,及提供一个验证接口,由组件自身完成判断,用于决定是否执行自身逻辑。
整体设计如下:
首先,为了统一 FetcherExecutor 的行为,抽取通用接口:
public interface ItemFetcherExecutor{ /** * 该组件是否能处理 cls 类型 * @param cls * @return */ boolean support(Class cls); /** * 执行真正的数据绑定 * @param fetchers */ void fetch(List fetchers); }
具体的实现,可以见 UserVOFetcherExecutorV2 的 support 方法:
@Override public boolean support(Classcls) { return UserVOFetcherV2.class.isAssignableFrom(cls); }
实现逻辑非常简单,只是判断 cls 是否实现了 UserVOFetcherV2 接口。
有了 FetcherExecutor 组件后,接下来就是为其提供统一的访问入口:
@Service public class FetcherService { @Autowired private ListitemFetcherExecutors; public void fetch(Class cls, List fetchers){ if (CollectionUtils.isNotEmpty(fetchers)){ this.itemFetcherExecutors.stream() // 是否能处理该类型 .filter(itemFetcherExecutor -> itemFetcherExecutor.support(cls)) // 执行真正的绑定 .forEach(itemFetcherExecutor -> itemFetcherExecutor.fetch(fetchers)); } } }
逻辑即为简单,依次遍历 FetcherExecutor,根据 support 执行结果,执行 fetch 逻辑。
【小常识】Spring 可以将容器中的全部实现直接注入到 List
OK,我们有了 FetcherService 提供统一的数据绑定能力,原来 OrderServiceFetcher 中 fetch 操作的变化点转移到 FetcherService,自身变得非常稳定。具体如下:
@Service public class OrderServiceFetcherV2 implements OrderService { @Autowired private OrderRepository orderRepository; @Autowired private FetcherService fetcherService; @Override public ListgetByUserId(Long userId) { List orders = this.orderRepository.getByUserId(userId); List orderDetailVOS = orders.stream() .map(order -> new OrderListVOFetcherV2(OrderVO.apply(order))) .collect(toList()); // VO 数据绑定发生变化,只需调整 VO 实现接口,此处无需变化 fetcherService.fetch(OrderListVOFetcherV2.class, orderDetailVOS); return orderDetailVOS.stream() .collect(toList()); } @Override public OrderDetailVO getDetailByOrderId(Long orderId) { Order order = this.orderRepository.getById(orderId); OrderDetailVOFetcherV2 orderDetail = new OrderDetailVOFetcherV2(OrderVO.apply(order)); // VO 数据绑定发生变化,只需调整 VO 实现接口,此处无需变化 fetcherService.fetch(OrderDetailVOFetcherV2.class, Arrays.asList(orderDetail)); return orderDetail; } }
终于,我们将变化收敛到 VO 内,VO 需要绑定新的数据,只需实现对应接口即可。
4.3. 并发绑定
经过重构,代码结构变得非常清晰,如果想通过多线程并发方式提供性能,需要调整哪些组件呢?好好想想!!!
只需对FetcherService进行调整,让我们来一个并发版本,具体如下:
@Service public class ConcurrentFetcherService { private ExecutorService executorService; @Autowired private ListitemFetcherExecutors; @PostConstruct public void init(){ this.executorService = Executors.newFixedThreadPool(20); } @SneakyThrows public void fetch(Class cls, List fetchers){ if (CollectionUtils.isNotEmpty(fetchers)){ // 创建异步执行任务 List > callables = this.itemFetcherExecutors.stream() .filter(itemFetcherExecutor -> itemFetcherExecutor.support(cls)) .map(itemFetcherExecutor -> (Callable ) () -> { itemFetcherExecutor.fetch(fetchers); return null; }).collect(Collectors.toList()); // 线程池中并行执行 this.executorService.invokeAll(callables); } } }
OrderServiceFetcherV3 只需使用 ConcurrentFetcherService 替代 原来的 FetcherService 并拥有了并发能力。
5. 注解方案
5.1. 复杂配置 @JoinInMemory 来帮忙
纵观整个 Fetcher 封装,虽然结构清晰,但细节过于繁琐,特别是:
- 待抓取数据需要抽取 Fetcher 接口
- 需要提供自己的 FetcherExecutor 实现
- VO 需要实现多个 Fetcher 接口
这些不便将成为落地最大的阻碍,那有没有办法进行进一步简化?
这需要思考下这些设计背后的深层需求:
Fetcher接口目的包括
- 提供绑定信息
- 设置绑定结果
- 被 FetcherExecutor 识别并进行处理
FetcherExecutor设计的目标包括:
- 识别待处理的 Fetcher
- 定制个性化流程
所有这些需求是否可用 ==注解== 的方式实现?
- 在 VO 属性上增加注解,说明绑定结果写回到该属性上
- 注解配置来源属性,提供绑定信息
- 注解配置流程属性,完成 FetcherExecutor 的个性化定制
- 每个注解背后是一个 FetcherExecutor 实现,完成 FetcherExecutor 与 “Fetcher” 绑定
根据上述分析,注解可完成全部任务,新建注解如下:
@Target({ElementType.FIELD, ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) public @interface JoinInMemory { /** * 从 sourceData 中提取 key * @return */ String keyFromSourceData(); /** * 从 joinData 中提取 key * @return */ String keyFromJoinData(); /** * 批量数据抓取 * @return */ String loader(); /** * 结果转换器 * @return */ String joinDataConverter() default ""; /** * 运行级别,同一级别的 join 可 并行执行 * @return */ int runLevel() default 10; }
乍一看,需要配置的信息真多,其实大多数配置全部与 FetcherExecutor 实现相关。
abstract class AbstractJoinItemExecutorimplements JoinItemExecutor { /** * 从原始数据中生成 JoinKey * @param data * @return */ protected abstract JOIN_KEY createJoinKeyFromSourceData(SOURCE_DATA data); /** * 根据 JoinKey 批量获取 JoinData * @param joinKeys * @return */ protected abstract List getJoinDataByJoinKeys(List joinKeys); /** * 从 JoinData 中获取 JoinKey * @param joinData * @return */ protected abstract JOIN_KEY createJoinKeyFromJoinData(JOIN_DATA joinData); /** * 将 JoinData 转换为 JoinResult * @param joinData * @return */ protected abstract JOIN_RESULT convertToResult(JOIN_DATA joinData); /** * 将 JoinResult 写回至 SourceData * @param data * @param JoinResults */ protected abstract void onFound(SOURCE_DATA data, List JoinResults); /** * 未找到对应的 JoinData * @param data * @param joinKey */ protected abstract void onNotFound(SOURCE_DATA data, JOIN_KEY joinKey); @Override public void execute(List sourceDatas) { // 从源数据中提取 JoinKey List joinKeys = sourceDatas.stream() .filter(Objects::nonNull) .map(this::createJoinKeyFromSourceData) .filter(Objects::nonNull) .distinct() .collect(toList()); log.debug("get join key {} from source data {}", joinKeys, sourceDatas); // 根据 JoinKey 获取 JoinData List allJoinDatas = getJoinDataByJoinKeys(joinKeys); log.debug("get join data {} by join key {}", allJoinDatas, joinKeys); // 将 JoinData 以 Map 形式进行组织 Map > joinDataMap = allJoinDatas.stream() .filter(Objects::nonNull) .collect(groupingBy(this::createJoinKeyFromJoinData)); log.debug("group by join key, result is {}", joinDataMap); // 处理每一条 SourceData for (SOURCE_DATA data : sourceDatas){ // 从 SourceData 中 获取 JoinKey JOIN_KEY joinKey = createJoinKeyFromSourceData(data); if (joinKey == null){ log.warn("join key from join data {} is null", data); continue; } // 根据 JoinKey 获取 JoinData List joinDatasByKey = joinDataMap.get(joinKey); if (CollectionUtils.isNotEmpty(joinDatasByKey)){ // 获取到 JoinData, 转换为 JoinResult,进行数据写回 List joinResults = joinDatasByKey.stream() .filter(Objects::nonNull) .map(joinData -> convertToResult(joinData)) .collect(toList()); log.debug("success to convert join data {} to join result {}", joinDatasByKey, joinResults); onFound(data, joinResults); log.debug("success to write join result {} to source data {}", joinResults, data); }else { log.warn("join data lost by join key {} for source data {}", joinKey, data); // 为获取到 JoinData,进行 notFound 回调 onNotFound(data, joinKey); } } } }
JoinInMemory 注解属性和AbstractJoinItemExecutor基本一致,在此就不做赘述,我们先看下具体的使用方式:
@Data public class OrderDetailVOAnnV1 extends OrderDetailVO { private final OrderVO order; @JoinInMemory(keyFromSourceData = "#{order.userId}", keyFromJoinData = "#{id}", loader = "#{@userRepository.getByIds(#root)}", joinDataConverter = "#{T(com.geekhalo.lego.joininmemory.order.UserVO).apply(#root)}" ) private UserVO user; // 其他暂时忽略 } @Data public class OrderListVOAnnV1 extends OrderListVO { private final OrderVO order; @JoinInMemory(keyFromSourceData = "#{order.userId}", keyFromJoinData = "#{id}", loader = "#{@userRepository.getByIds(#root)}", joinDataConverter = "#{T(com.geekhalo.lego.joininmemory.order.UserVO).apply(#root)}" ) private UserVO user; // 其他暂时忽略 }
我们以 UserVO user 属性为例
@JoinInMemory 注解中大量使用 SpEL,不熟悉的伙伴可以自行网上进行检索。
其他部分不变,定义 OrderService 如下:
@Service public class OrderServiceAnnV1 implements OrderService { @Autowired private OrderRepository orderRepository; @Autowired private JoinService joinService; @Override public ListgetByUserId(Long userId) { List orders = this.orderRepository.getByUserId(userId); List orderDetailVOS = orders.stream() .map(order -> new OrderListVOAnnV1(OrderVO.apply(order))) .collect(toList()); this.joinService.joinInMemory(OrderListVOAnnV1.class, orderDetailVOS); return orderDetailVOS.stream() .collect(toList()); } @Override public OrderDetailVO getDetailByOrderId(Long orderId) { Order order = this.orderRepository.getById(orderId); OrderDetailVOAnnV1 orderDetail = new OrderDetailVOAnnV1(OrderVO.apply(order)); this.joinService.joinInMemory(OrderDetailVOAnnV1.class, Arrays.asList(orderDetail)); return orderDetail; } }
相对于 Fetcher 抽象,我们将 Fetcher、FetcherExecutor 全部配置化,并通过 注解的方式进行呈现,相对于 Coding 方案,注解方案更加灵活,工作量也更小。
5.2. 复杂配置 @Alias 来帮忙
相对于 Fetcher 封装,一个 @JoinInMemory 成功干掉了两个组件,但观其自身配置起来还是非常繁琐。比如,在订单查询这个场景,在 OrderListVO 和 OrderDetailVO 中都需要对 UserVO 进行数据绑定,观察两个注解,我们会发现很多重复配置:
//OrderListVO @JoinInMemory(keyFromSourceData = "#{order.userId}", keyFromJoinData = "#{id}", loader = "#{@userRepository.getByIds(#root)}", joinDataConverter = "#{T(com.geekhalo.lego.joininmemory.order.UserVO).apply(#root)}" ) private UserVO user; // OrderDetailVO @JoinInMemory(keyFromSourceData = "#{order.userId}", keyFromJoinData = "#{id}", loader = "#{@userRepository.getByIds(#root)}", joinDataConverter = "#{T(com.geekhalo.lego.joininmemory.order.UserVO).apply(#root)}" ) private UserVO user;
两个配置完全一样,细品之后会发现:
【变化】入参变化,读取的属性不同,只是本次恰巧相同而已
- OrderListVO 指的是 OrderListVO 属性 order 的id值
- OrderDetailVO 指的是 OrderDetailVO 属性 order 的值
【不变】处理逻辑不变
- keyFromJoinData 指的是 user对象的 id
- loader 指的是通过 userRepository 的 getByIds 加载数据
- joinDataConverter 指的是将 user 转换为 UserVO
【不变】
- 将绑定结果 UserVO 绑定到属性上(属性名不同没有影响)
对于不变部分如何进行统一管理?
自定义注解 结合 Spring @AliasFor 便可以解决这个问题,以 UserVO 为例:
@Target(ElementType.FIELD) @Retention(RetentionPolicy.RUNTIME) // 管理通用属性 @JoinInMemory(keyFromSourceData = "", keyFromJoinData = "#{id}", loader = "#{@userRepository.getByIds(#root)}", joinDataConverter = "#{T(com.geekhalo.lego.joininmemory.order.UserVO).apply(#root)}" ) public @interface JoinUserVOOnId { // 使用别名将 keyFromSourceData 的配置暴露出来 @AliasFor( annotation = JoinInMemory.class ) String keyFromSourceData(); }
新注解有如下几个特点:
- 在注解上使用 @JoinInMemory 注解完成对通用属性的配置
- 在自定义注解 JoinUserVOOnId 的 keyFromSourceData 属性上,添加 @AliasFor 注解,将配置暴露给使用方
有了自定义注解,使用变的非常方便:
@Data public class OrderListVOAnnV2 extends OrderListVO { private final OrderVO order; // 只需配置参数即可,其他配置由 JoinUserVOOnId 进行管理 @JoinUserVOOnId(keyFromSourceData = "#{order.userId}") private UserVO user; } @Data public class OrderDetailVOAnnV2 extends OrderDetailVO { private final OrderVO order; // 只需配置参数即可,其他配置由 JoinUserVOOnId 进行管理 @JoinUserVOOnId(keyFromSourceData = "#{order.userId}") private UserVO user; }
其他使用方式不变,但实现了逻辑简化:
- 新增绑定数据,只需自定义绑定注解
- VO 需新的绑定数据,只需在属性上添加绑定注解
5.3. 开启并发 @JoinInMemoryConfig 来帮忙
如果担心性能,可以一键开启并发绑定,示例如下:
@Data @JoinInMemoryConfig(executorType = JoinInMemeoryExecutorType.PARALLEL) public class OrderListVOAnnV3 extends OrderListVO { private final OrderVO order; @JoinUserVOOnId(keyFromSourceData = "#{order.userId}") private UserVO user; @JoinAddressVOOnId(keyFromSourceData = "#{order.addressId}") private AddressVO address; @JoinProductVOOnId(keyFromSourceData = "#{order.productId}") private ProductVO product; }
JoinInMemoryConfig 配置如下:
6. 最佳实践
6.1.将定义注解视为最佳实践
@JoinInMemory 注解上配置的信息太多,如果直接在业务代码中使用,非常难以维护,当每个配置发生变化后,很难一次性修改到位。所以,建议只将他作为“原注解”使用。
整体思路详见:
6.2. 注意线程池隔离
对于不同的数据绑定需求,建议使用不同的线程池,从资源层面对不同功能进行隔离,从而将由于依赖接口发生阻塞导致线程耗尽所造成的影响控制在最小范围。
@JoinInMemoryConfig 的 executorName 属性配置的便是执行器名称,不配置直接使用 “defaultExecutor”,具体代码如下:
@Bean public ExecutorService defaultExecutor(){ BasicThreadFactory basicThreadFactory = new BasicThreadFactory.Builder() .namingPattern("JoinInMemory-Thread-%d") .daemon(true) .build(); int maxSize = Runtime.getRuntime().availableProcessors() * 3; return new ThreadPoolExecutor(0, maxSize, 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), basicThreadFactory, new ThreadPoolExecutor.CallerRunsPolicy()); }
如需使用自定义线程池需:
- 自定义线程池,并将其注册到Spring 容器
- @JoinInMemoryConfig executorName 设置为线程池的 bean name
7. 小结
推导逻辑有点长不知道你get到多少,先简单回顾一下:
- 今天面对的问题是:如何在应用成进行数据 Join 操作;
- 我们以我的订单和订单详情两个接口为业务切入点,层层进行抽象,发现变化、封装变化、管理变化
- 首先是手写代码,包括 foreach+单条抓取,批量查询+内存Join,并行查询 + 内存Join。在这个层次基本没有抽象可言,存在大量重复代码,系统扩展性低
- 其次是 Fetcher方案,为了分离“变化”与“不变”抽取出 Fetcher 和 FetcherExecutor 两个接口,并使用模板方法和责任链模式对其进行抽象,提升系统的扩展性,但实现过于繁琐不便于推广
- 最后是注解方案,使用 @JoinInMemory 注解完成繁琐的配置工作,将通用配置保留在自定义注解进行统一管理,基于 @AliasFor 完成入参的配置,还可以使用 @JoinInMemoryConfig 开启并发处理
原文地址:https://www.toutiao.com/article/7306772567284417039/