Wow4j Wow4j
首页
个人使用说明书
后端开发
前端开发
测试开发
运维开发
大数据开发
产品&UI交互
团队管理
软技能
他山之石
开源产品
敬请期待
GitHub (opens new window)
首页
个人使用说明书
后端开发
前端开发
测试开发
运维开发
大数据开发
产品&UI交互
团队管理
软技能
他山之石
开源产品
敬请期待
GitHub (opens new window)
  • 概要
  • 面试八股文

  • 服务端小技巧合集

    • byte buddy 实现链路上所有方法耗时打印
    • 业务线程池不丢 traceId 的方法
    • 让你的java业务代码并发的调用,并正确的处理返回结果
      • 1 背景
      • 2 实现的demo
        • 2.1 工具类 ConcurrencyDataTagEnum.java
        • 2.2 ConcurrencyDataDTO.java
        • 2.3 ExecutorTemplate.java
        • 2.4 核心业务代码
    • 服务端常见线上问题整理与解决措施
    • 服务端日志打印最佳实践
    • 轻松正确理解并上手RESTful
    • 服务端业务线程池优雅使用
    • 服务端如何正确优雅使用流控平台
    • 服务端如何正确的使用分布式锁防止缓存击穿
    • 服务端接口设计最佳实践
  • Java基础

  • MySQL 相关

  • Redis 最佳实践指南

  • 文本搜索Elasticsearch

  • Kafka 最佳实践指南

  • 网络相关

  • 架构相关

  • 监控告警

  • 防爬风控

  • 稳定性 checklist

  • 效能工具

  • 后端开发
  • 服务端小技巧合集
timchen525
2023-02-01

让你的java业务代码并发的调用,并正确的处理返回结果

# 1 背景

实际的java web开发过程中,在业务处理的过程中,需要调用多次外部的服务(可能是http服务,也可能是rpc服务),而这写调用是可以并行的。然而,目前网上能找到如何编写这类代码的资料极少,对于初学者来说想要实现起来会有一定的难度。因此,本篇文章将给出一个简单易用的并行处理代码,希望对你有帮助。

# 2 实现的demo

# 2.1 工具类 ConcurrencyDataTagEnum.java

@Getter
@AllArgsConstructor
public enum ConcurrencyDataTagEnum {

    /**
     * 商品id与商品的映射标记
     */
    PRODUCT_ID_PRODUCT_MAP_TAG("ProductIdProductMap", "商品id与商品的映射标记"),

    /**
     * 商品id与品牌的映射标记
     */
    PRODUCT_ID_BRAND_MAP_TAG("ProductIdBrandMap", "商品id与品牌的映射标记"),

    /**
     * 商品id与类别list的映射标记
     */
    PRODUCT_ID_CATEGORY_LIST_MAP_TAG("ProductIdCategoryListMap", "品id与类别list的映射标记"),

    /**
     * 商品id与区域id的映射标记
     */
    PRODUCT_ID_AREA_ZIP_AREA_MAP_TAG("ProductIdAreaZipAreaMap", "商品id与区域id的映射标记"),

    /**
     * 门店商品id与活动列表的映射标记
     */
    EFFECTIVE_STORE_PRODUCT_ID_ACTIVITY_ESO_MAP_TAG("EffectiveStoreProductIdActivityESOMap", "门店商品id与活动列表的映射标记");

    private String key;

    private String value;

}

# 2.2 ConcurrencyDataDTO.java

@Data
public class ConcurrencyDataDTO<T> {

    /**
     * 数据的标签,便于后期在一堆并发数据中取得想要的数据
     */
    private ConcurrencyDataTagEnum concurrencyDataTagEnum;

    private T data;

    public static ConcurrencyDataDTO create(ConcurrencyDataTagEnum concurrencyDataTagEnum, Object data) {
        ConcurrencyDataDTO concurrencyDataDTO = new ConcurrencyDataDTO();
        concurrencyDataDTO.setConcurrencyDataTagEnum(concurrencyDataTagEnum);
        concurrencyDataDTO.setData(data);
        return concurrencyDataDTO;
    }
}

# 2.3 ExecutorTemplate.java

@Slf4j
public class ExecutorTemplate {

    private volatile ThreadPoolTaskExecutor executor = null;
    private volatile List<Future> futures = null;

    public ExecutorTemplate(ThreadPoolTaskExecutor executor) {
        this.futures = Collections.synchronizedList(new ArrayList<Future>());
        this.executor = executor;
    }

    public void submit(Runnable task) {
        Future future = executor.submit(task);
        futures.add(future);
        check(future);
    }

    public void submit(Callable<ConcurrencyDataDTO> task) {
        Future future = executor.submit(task);
        futures.add(future);
        check(future);
    }

    private void check(Future future) {
        if (future.isDone()) {
            // 立即判断一次,因为使用了CallerRun可能当场跑出结果,针对异常时快速响应
            try {
                future.get();
            } catch (Throwable e) {
                // 取消完之后立马退出
                cancelAllFutures();
                throw new RuntimeException(e);
            }
        }
    }

    public synchronized List<ConcurrencyDataDTO> waitForResult() {
        List<ConcurrencyDataDTO> result = new ArrayList();
        RuntimeException exception = null;
        for (Future future : futures) {
            try {
                Object object = future.get();
                if (object instanceof ConcurrencyDataDTO) {
                    result.add((ConcurrencyDataDTO) object);
                } else {
                    log.warn("future.get result object type is'nt ConcurrencyDTO, return object:{}.", JsonUtils.toJson(object));
                }
            } catch (Throwable e) {
                exception = new RuntimeException(e);
                // 如果一个future出现了异常,就退出
                break;
            }
        }

        if (exception != null) {
            cancelAllFutures();
            throw exception;
        } else {
            return result;
        }
    }

    public void cancelAllFutures() {
        for (Future future : futures) {
            if (!future.isDone() && !future.isCancelled()) {
                future.cancel(true);
            }
        }
    }

    public void clear() {
        futures.clear();
    }

}

# 2.4 核心业务代码

我们将下述的4个本来想要串行的代码并行化:

  1. storeProductRelativeService.getProductIdProductMap(productIdSet))
  2. storeProductRelativeService.getProductIdBrandMap(productIdSet))
  3. storeProductRelativeService.getProductIdCategoryListMap(productIdSet))
  4. storeProductRelativeService.getProductIdAreaZipAreaMap(productIdSet))
 ...
 Map<String, ProductDTO> productIdProductMap = null;
 Map<String, BrandDTO> productIdBrandMap = null;
 Map<String, List<CategoryDTO>> productIdCategoryListMap = null;
 Map<String, AreaProductDTO> productIdAreaZipAreaMap = null;
 Map<String, List<ActivityDataESO>> storeProductIdActivityListMap = null;
try {
            // 并发执行相关的rpc调用
            if (!CollectionUtils.isEmpty(productIdSet)) {
                executorTemplate.submit(() -> ConcurrencyDataDTO.create(ConcurrencyDataTagEnum.PRODUCT_ID_PRODUCT_MAP_TAG, storeProductRelativeService.getProductIdProductMap(productIdSet)));
                executorTemplate.submit(() -> ConcurrencyDataDTO.create(ConcurrencyDataTagEnum.PRODUCT_ID_BRAND_MAP_TAG, storeProductRelativeService.getProductIdBrandMap(productIdSet)));
                executorTemplate.submit(() -> ConcurrencyDataDTO.create(ConcurrencyDataTagEnum.PRODUCT_ID_CATEGORY_LIST_MAP_TAG, storeProductRelativeService.getProductIdCategoryListMap(productIdSet)));
                executorTemplate.submit(() -> ConcurrencyDataDTO.create(ConcurrencyDataTagEnum.PRODUCT_ID_AREA_ZIP_AREA_MAP_TAG, storeProductRelativeService.getProductIdAreaZipAreaMap(productIdSet)));
            }
            if (!CollectionUtils.isEmpty(storeProductIdSet)) {
                executorTemplate.submit(() -> ConcurrencyDataDTO.create(ConcurrencyDataTagEnum.EFFECTIVE_STORE_PRODUCT_ID_ACTIVITY_ESO_MAP_TAG, activityStoreProductMapsService.getEffectiveStoreProductIdActivityESOMap(storeProductIdSet)));
            }
            // 等待所有异步执行结果
            List<ConcurrencyDataDTO> concurrencyDataDTOList = executorTemplate.waitForResult();

            for (ConcurrencyDataDTO concurrencyDataDTO : concurrencyDataDTOList) {
                ConcurrencyDataTagEnum concurrencyDataTagEnum = concurrencyDataDTO.getConcurrencyDataTagEnum();
                switch (concurrencyDataTagEnum) {
                    case PRODUCT_ID_PRODUCT_MAP_TAG:
                        productIdProductMap = (Map<String, ProductDTO>) concurrencyDataDTO.getData();
                        break;
                    case PRODUCT_ID_BRAND_MAP_TAG:
                        productIdBrandMap = (Map<String, BrandDTO>) concurrencyDataDTO.getData();
                        break;
                    case PRODUCT_ID_CATEGORY_LIST_MAP_TAG:
                        productIdCategoryListMap = (Map<String, List<CategoryDTO>>) concurrencyDataDTO.getData();
                        break;
                    case PRODUCT_ID_AREA_ZIP_AREA_MAP_TAG:
                        productIdAreaZipAreaMap = (Map<String, AreaProductDTO>) concurrencyDataDTO.getData();
                        break;
                    case EFFECTIVE_STORE_PRODUCT_ID_ACTIVITY_ESO_MAP_TAG:
                        storeProductIdActivityListMap = (Map<String, List<ActivityDataESO>>) concurrencyDataDTO.getData();
                        break;
                    default:
                        log.warn("concurrencyDataTagEnum[{}] is unknown, return data:{}.", concurrencyDataTagEnum, JsonUtils.toJson(concurrencyDataDTO));
                        break;
                }
            }
        } catch (Exception e) {
            log.error("exception, error message:{}.", e.getMessage(), e);
            throw e;
        } finally {
            // 注意:一定要添加这个,不然会导致内存泄漏
            executorTemplate.clear();
        }
 // 处理上述业务的返回值
 // todo
 ...      

executorTemplate 这里面的线程池的参数配置,可以根据业务进行显示的配置。

上次更新: 2023/02/06, 09:35:40
业务线程池不丢 traceId 的方法
服务端常见线上问题整理与解决措施

← 业务线程池不丢 traceId 的方法 服务端常见线上问题整理与解决措施→

Theme by Vdoing | Copyright © 2022-2023 timchen525 | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式
×