让你的java业务代码并发的调用,并正确的处理返回结果
# 1 背景
实际的java web开发过程中,在业务处理的过程中,需要调用多次外部的服务(可能是http服务,也可能是rpc服务),而这写调用是可以并行的。然而,目前网上能找到如何编写这类代码的资料极少,对于初学者来说想要实现起来会有一定的难度。因此,本篇文章将给出一个简单易用的并行处理代码,希望对你有帮助。
# 2 实现的demo
# 2.1 工具类 ConcurrencyDataTagEnum.java
@Getter
@AllArgsConstructor
public enum ConcurrencyDataTagEnum {
/**
* 商品id与商品的映射标记
*/
PRODUCT_ID_PRODUCT_MAP_TAG("ProductIdProductMap", "商品id与商品的映射标记"),
/**
* 商品id与品牌的映射标记
*/
PRODUCT_ID_BRAND_MAP_TAG("ProductIdBrandMap", "商品id与品牌的映射标记"),
/**
* 商品id与类别list的映射标记
*/
PRODUCT_ID_CATEGORY_LIST_MAP_TAG("ProductIdCategoryListMap", "品id与类别list的映射标记"),
/**
* 商品id与区域id的映射标记
*/
PRODUCT_ID_AREA_ZIP_AREA_MAP_TAG("ProductIdAreaZipAreaMap", "商品id与区域id的映射标记"),
/**
* 门店商品id与活动列表的映射标记
*/
EFFECTIVE_STORE_PRODUCT_ID_ACTIVITY_ESO_MAP_TAG("EffectiveStoreProductIdActivityESOMap", "门店商品id与活动列表的映射标记");
private String key;
private String value;
}
# 2.2 ConcurrencyDataDTO.java
@Data
public class ConcurrencyDataDTO<T> {
/**
* 数据的标签,便于后期在一堆并发数据中取得想要的数据
*/
private ConcurrencyDataTagEnum concurrencyDataTagEnum;
private T data;
public static ConcurrencyDataDTO create(ConcurrencyDataTagEnum concurrencyDataTagEnum, Object data) {
ConcurrencyDataDTO concurrencyDataDTO = new ConcurrencyDataDTO();
concurrencyDataDTO.setConcurrencyDataTagEnum(concurrencyDataTagEnum);
concurrencyDataDTO.setData(data);
return concurrencyDataDTO;
}
}
# 2.3 ExecutorTemplate.java
@Slf4j
public class ExecutorTemplate {
private volatile ThreadPoolTaskExecutor executor = null;
private volatile List<Future> futures = null;
public ExecutorTemplate(ThreadPoolTaskExecutor executor) {
this.futures = Collections.synchronizedList(new ArrayList<Future>());
this.executor = executor;
}
public void submit(Runnable task) {
Future future = executor.submit(task);
futures.add(future);
check(future);
}
public void submit(Callable<ConcurrencyDataDTO> task) {
Future future = executor.submit(task);
futures.add(future);
check(future);
}
private void check(Future future) {
if (future.isDone()) {
// 立即判断一次,因为使用了CallerRun可能当场跑出结果,针对异常时快速响应
try {
future.get();
} catch (Throwable e) {
// 取消完之后立马退出
cancelAllFutures();
throw new RuntimeException(e);
}
}
}
public synchronized List<ConcurrencyDataDTO> waitForResult() {
List<ConcurrencyDataDTO> result = new ArrayList();
RuntimeException exception = null;
for (Future future : futures) {
try {
Object object = future.get();
if (object instanceof ConcurrencyDataDTO) {
result.add((ConcurrencyDataDTO) object);
} else {
log.warn("future.get result object type is'nt ConcurrencyDTO, return object:{}.", JsonUtils.toJson(object));
}
} catch (Throwable e) {
exception = new RuntimeException(e);
// 如果一个future出现了异常,就退出
break;
}
}
if (exception != null) {
cancelAllFutures();
throw exception;
} else {
return result;
}
}
public void cancelAllFutures() {
for (Future future : futures) {
if (!future.isDone() && !future.isCancelled()) {
future.cancel(true);
}
}
}
public void clear() {
futures.clear();
}
}
# 2.4 核心业务代码
我们将下述的4个本来想要串行的代码并行化:
- storeProductRelativeService.getProductIdProductMap(productIdSet))
- storeProductRelativeService.getProductIdBrandMap(productIdSet))
- storeProductRelativeService.getProductIdCategoryListMap(productIdSet))
- storeProductRelativeService.getProductIdAreaZipAreaMap(productIdSet))
...
Map<String, ProductDTO> productIdProductMap = null;
Map<String, BrandDTO> productIdBrandMap = null;
Map<String, List<CategoryDTO>> productIdCategoryListMap = null;
Map<String, AreaProductDTO> productIdAreaZipAreaMap = null;
Map<String, List<ActivityDataESO>> storeProductIdActivityListMap = null;
try {
// 并发执行相关的rpc调用
if (!CollectionUtils.isEmpty(productIdSet)) {
executorTemplate.submit(() -> ConcurrencyDataDTO.create(ConcurrencyDataTagEnum.PRODUCT_ID_PRODUCT_MAP_TAG, storeProductRelativeService.getProductIdProductMap(productIdSet)));
executorTemplate.submit(() -> ConcurrencyDataDTO.create(ConcurrencyDataTagEnum.PRODUCT_ID_BRAND_MAP_TAG, storeProductRelativeService.getProductIdBrandMap(productIdSet)));
executorTemplate.submit(() -> ConcurrencyDataDTO.create(ConcurrencyDataTagEnum.PRODUCT_ID_CATEGORY_LIST_MAP_TAG, storeProductRelativeService.getProductIdCategoryListMap(productIdSet)));
executorTemplate.submit(() -> ConcurrencyDataDTO.create(ConcurrencyDataTagEnum.PRODUCT_ID_AREA_ZIP_AREA_MAP_TAG, storeProductRelativeService.getProductIdAreaZipAreaMap(productIdSet)));
}
if (!CollectionUtils.isEmpty(storeProductIdSet)) {
executorTemplate.submit(() -> ConcurrencyDataDTO.create(ConcurrencyDataTagEnum.EFFECTIVE_STORE_PRODUCT_ID_ACTIVITY_ESO_MAP_TAG, activityStoreProductMapsService.getEffectiveStoreProductIdActivityESOMap(storeProductIdSet)));
}
// 等待所有异步执行结果
List<ConcurrencyDataDTO> concurrencyDataDTOList = executorTemplate.waitForResult();
for (ConcurrencyDataDTO concurrencyDataDTO : concurrencyDataDTOList) {
ConcurrencyDataTagEnum concurrencyDataTagEnum = concurrencyDataDTO.getConcurrencyDataTagEnum();
switch (concurrencyDataTagEnum) {
case PRODUCT_ID_PRODUCT_MAP_TAG:
productIdProductMap = (Map<String, ProductDTO>) concurrencyDataDTO.getData();
break;
case PRODUCT_ID_BRAND_MAP_TAG:
productIdBrandMap = (Map<String, BrandDTO>) concurrencyDataDTO.getData();
break;
case PRODUCT_ID_CATEGORY_LIST_MAP_TAG:
productIdCategoryListMap = (Map<String, List<CategoryDTO>>) concurrencyDataDTO.getData();
break;
case PRODUCT_ID_AREA_ZIP_AREA_MAP_TAG:
productIdAreaZipAreaMap = (Map<String, AreaProductDTO>) concurrencyDataDTO.getData();
break;
case EFFECTIVE_STORE_PRODUCT_ID_ACTIVITY_ESO_MAP_TAG:
storeProductIdActivityListMap = (Map<String, List<ActivityDataESO>>) concurrencyDataDTO.getData();
break;
default:
log.warn("concurrencyDataTagEnum[{}] is unknown, return data:{}.", concurrencyDataTagEnum, JsonUtils.toJson(concurrencyDataDTO));
break;
}
}
} catch (Exception e) {
log.error("exception, error message:{}.", e.getMessage(), e);
throw e;
} finally {
// 注意:一定要添加这个,不然会导致内存泄漏
executorTemplate.clear();
}
// 处理上述业务的返回值
// todo
...
executorTemplate 这里面的线程池的参数配置,可以根据业务进行显示的配置。
上次更新: 2023/02/06, 09:35:40