关键配置:
/**
* 代理类对象
*/
YrSyncWrService yrSyncWrServiceProxy;
// 获取代理类的对象,调用本类方法时,注解才会生效。比如@Async多线程,@Transactional事务控制
yrSyncWrServiceProxy = SpringUtil.getBean(YrSyncWrServiceImpl.class);
// 创建指定多线程处理批次数大小数组
CompletableFuture<Map<String, Integer>>[] completableFutureArr = new CompletableFuture[batchNums];
CompletableFuture<Map<String, Integer>> completableFuture;
// 每次遍历数量为 batchQueryNum 的记录,分批进行多线程处理
for (int batchNum = 0; batchNum < batchNums; batchNum++) {
// 如果异步标识没值或为false
if (asyncFlag == null || !asyncFlag) {
// 同步指定类型网元(没有用代理类调用本类方法,则多线程不起作用,视为单线程处理)
completableFuture = syncOneNetEleTypeByBatchNum(netEleClass, metaWrFieldMapList, syncNetEleType, batchNum, netElementNameList, lastUpdateTime);
} else {
// 按批次号同步指定类型网元(多线程异步处理)//异步标志.true:异步;false或NULL:同步.
completableFuture = yrSyncWrServiceProxy.syncOneNetEleTypeByBatchNum(netEleClass, metaWrFieldMapList, syncNetEleType, batchNum, netElementNameList, lastUpdateTime);
}
completableFutureArr[batchNum] = completableFuture;
}
// 等待所有任务都执行完
CompletableFuture.allOf(completableFutureArr).join();
CompletableFuture<Map<String, Integer>> cf;
// 统计成功条数
for (int i = 0; i < completableFutureArr.length; i++) {
cf = completableFutureArr[i];
Map<String, Integer> resultMap = null;
try {
resultMap = cf.get();
} catch (InterruptedException e) {
wrAlarmTipsLogService.generateAlarmTipsLog("wr_sync_finish_" + syncNetEleType + "_add,error", syncNetEleType, e.getMessage(), "luhuiping");
Thread.currentThread().interrupt();//捕获到InterruptedException异常后恢复中断状态
log.info("sync_fail (600002) 综资同步到无线资源,正在同步当前网元【{}】,异常错误信息【{}】",
syncNetEleType ,e.getMessage());
e.printStackTrace();
} catch (ExecutionException e) {
log.info("sync_fail (600003) 综资同步到无线资源,正在同步当前网元【{}】,异常错误信息【{}】",
syncNetEleType, e.getMessage());
e.printStackTrace();
}
if (!ObjectUtils.isNullObj(resultMap) && !resultMap.isEmpty()) {
succCnt += resultMap.get("succCnt");
// updateCnt += resultMap.get("updateCnt");
excludeCnt += resultMap.get("excludeCnt");
succFailCnt += resultMap.get("succFailCnt");
// updateFailCnt += resultMap.get("updateFailCnt");
}
}
/**
* 按批次号同步指定类型网元(多线程异步处理)
*
* @param
* @return java.util.concurrent.CompletableFuture<java.lang.String>
* @author liangzhaolin
* @date 2020/7/28 16:45
*/
// @Async
// @Async("syncOneNetEleTypeExecutor")
@Override
public CompletableFuture<Map<String, Integer>> syncOneNetEleTypeByBatchNum(Class<?> netEleClass, List<MetaWrFieldMap> metaWrFieldMapList, String netEleType, int batchNum, List<String> netElementNameList, String lastUpdateTime) {
try {
long startTime = System.currentTimeMillis();
// 开始行数
// int startRow = (batchNum * batchQueryNum) + 1;
int startRow = (batchNum * BATCH_QUERY_NUM) + 1;
// 结束行数
// int endRow = (batchNum + 1) * batchQueryNum;
int endRow = (batchNum + 1) * BATCH_QUERY_NUM;
// 处理结果 新增
int resultAdd = 0;
//更新
int resultUpdate = 0;
//不处理结果
int resultExclude = 0;
// 新增成功行数
int resultAddFail = 0;
//不处理条数 Station
int resultUpdateFail = 0;
// 排列顺序.0:正序;1:倒序.默认0.
int sortType = 0;
List<Map<String, Object>> yrInfoMapList;
// 获取实体类名称
String netEleClassName = netEleClass.getSimpleName();
// 获取对应网元mapper
mapper = (BaseMapper) SpringUtil.getBean(StringUtils.firstToLowerCase(netEleClassName) + "Mapper");
// 查询指定行数区间的亿阳综资记录, 指定同步数据
if (netElementNameList != null && netElementNameList.size() > 0) {
yrInfoMapList = yyzzService.queryYrIncSyncInfo(netEleType, lastUpdateTime, sortType, startRow, endRow, netElementNameList, null);
}else{
yrInfoMapList = yyzzService.queryYrIncSyncInfo(netEleType, lastUpdateTime, sortType, startRow, endRow, null, null);
}
if(yrInfoMapList == null || yrInfoMapList.size() <= 0){
log.info("sync_fail (600004) 综资同步到无线资源,正在同步当前网元【{}】, 查询综资数据为【{}】",
netEleType ,yrInfoMapList);
}
log.info("sync_info (500008) 综资同步到无线资源,正在同步当前网元【{}】, 查询综资数据为【{}】",
netEleType ,yrInfoMapList.toString());
// 遍历每条亿阳综资查询数据进行处理
for (Map<String, Object> yrInfoMap : yrInfoMapList) {
log.info("sync_info (500009) 综资同步到无线资源,正在同步当前网元【{}】, 查询综资遍历数据【{}】", netEleType, yrInfoMap);
try {
//当前数据是否存在无线资源库中,如果存在,综资的最后修改时间是否大于无线资源的最后修改时间
//大于则修改
int saveOrExclude = wrSyncDateUpdateConfirm(yrInfoMap, netEleType);
log.info("sync_info (500010) 综资同步到无线资源,正在同步当前网元【{}】, 判断新增或修改 1:需要修改 2:不操作 3:新增 4:不操作【{}】", netEleType, saveOrExclude);
// //1:不操作 0:新增
if(saveOrExclude == 0){
//0:新增 并插入日志
resultAdd = syncOneNetEleTypeWithTransaction(netEleClass, netEleType, metaWrFieldMapList, yrInfoMap, saveOrExclude);
if(resultAdd == 0){
resultAddFail++;
log.info("sync_fail (600006) 综资同步到无线资源,正在同步当前网元【{}】, 数据同步新增失败【{}】,新增失败状态【{}】",
netEleType ,yrInfoMap ,saveOrExclude);
}else if(resultAdd == 1){
//新增成功把综资id保存到map集合中
if("station".equals(netEleType)){
gatherStationMap.put(String.valueOf(yrInfoMap.get("空间资源ID")), String.valueOf(yrInfoMap.get("空间资源名称")));
}if("room".equals(netEleType)){
gatherRoomMap.put(String.valueOf(yrInfoMap.get("空间资源ID")), String.valueOf(yrInfoMap.get("空间资源名称")));
}
log.info("sync_info (600601) 综资同步到无线资源新增成功,正在同步当前网元【{}】,空间资源ID【{}】,空间资源名称【{}】",
netEleType, String.valueOf(yrInfoMap.get("空间资源ID")), String.valueOf(yrInfoMap.get("空间资源名称")));
resultAdd++;
}
}else if(saveOrExclude == 1){
//不处理(不符合新增或修改要求)
resultExclude++;
}
} catch (Exception e) {
if (e instanceof DuplicateKeyException) {
log.info("sync_fail (600007) 综资同步到无线资源,正在同步当前网元【{}】,数据同步新增失败-1【{}】,异常错误信息【{}】",
netEleType ,yrInfoMap ,e.getMessage());
// do nothing
// 如果入库报错(目前估计可能引起入库报错的情况只有,重跑程序时,插入wr_yr_sync_log日志表时会有冲突而引起的报错回滚),do nothing,keep running
} else {
log.info("sync_fail (600008) 综资同步到无线资源,正在同步当前网元【{}】,数据同步新增失败-2【{}】,异常错误信息【{}】",
netEleType, yrInfoMap, e.getMessage());
}
}
}
Map<String, Integer> cfMap = new HashMap<>();
cfMap.put("succCnt", resultAdd);//处理条数
cfMap.put("excludeCnt", resultExclude);//不处理条数
cfMap.put("succFailCnt", resultAddFail);//更新失败条数
// 返回本批处理的记录成功行数和总行数
CompletableFuture<Map<String, Integer>> completableFuture = CompletableFuture.completedFuture(cfMap);
long endTime = System.currentTimeMillis();
log.info("sync_into_test (1100001) 综资同步到无线资源,当前网元同步网元【{}】,同步5000条耗时【{}】毫秒," +
"查询分页开始【{}】,查询分页结束【{}】,同步时间范围【{}】", netEleType, endTime - startTime,
startRow, endRow, lastUpdateTime);
return completableFuture;
} catch (Exception e) {
log.info("sync_fail (6100009) 综资同步到无线资源,正在同步当前网元【{}】,异常错误信息【{}】",
netEleType, e.getMessage());
e.printStackTrace();
}
return null;
}
@Configuration
@Slf4j
public class ThreadExecutorConfig {
/**
* 配置亿阳综资同步无线资源接口线程池
*
* @param
* @return java.util.concurrent.Executor
* @author liangzhaolin
* @date 2020/8/2 17:31
*/
@Bean("yrSyncWrExecutor")
public Executor yrSyncWrExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 核心线程数:线程池创建时候初始化的线程数
executor.setCorePoolSize(30);
// 最大线程数:线程池最大的线程数,只有在缓冲队列满了之后才会申请超过核心线程数的线程
executor.setMaxPoolSize(300);
// 缓冲队列:用来缓冲执行任务的队列
executor.setQueueCapacity(600);
// 允许线程的空闲时间60秒:当超过了核心线程之外的线程在空闲时间到达之后会被销毁
executor.setKeepAliveSeconds(60);
// 线程池名的前缀:设置好了之后可以方便我们定位处理任务所在的线程池
executor.setThreadNamePrefix("YrSyncWrThread-");
// 缓冲队列满了之后的拒绝策略:由调用线程处理(一般是主线程),这里是DiscardPolicy丢弃策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
executor.initialize();
return executor;
}
/**
* 配置实时无线同步综资接口线程池
*
* @param
* @return java.util.concurrent.Executor
* @author linlianghong
* @date 2020/11/13
*/
@Bean("realYrSyncWrExecutor")
public Executor realYrSyncWrExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 核心线程数:线程池创建时候初始化的线程数
executor.setCorePoolSize(10);
// 最大线程数:线程池最大的线程数,只有在缓冲队列满了之后才会申请超过核心线程数的线程
executor.setMaxPoolSize(15);
// 缓冲队列:用来缓冲执行任务的队列
executor.setQueueCapacity(600);
// 允许线程的空闲时间60秒:当超过了核心线程之外的线程在空闲时间到达之后会被销毁
executor.setKeepAliveSeconds(60);
// 线程池名的前缀:设置好了之后可以方便我们定位处理任务所在的线程池
executor.setThreadNamePrefix("RealYrSyncWrThread-");
// 缓冲队列满了之后的拒绝策略:由调用线程处理(一般是主线程),这里是DiscardPolicy丢弃策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
executor.initialize();
return executor;
}
/**
* 配置线程池
* ThreadPoolTaskExecutor的处理流程:当池子大小小于corePoolSize,就新建线程,并处理请求
* 当池子大小等于corePoolSize,把请求放入workQueue中,池子里的空闲线程就去workQueue中取任务并处理
* 当workQueue放不下任务时,就新建线程入池,并处理请求,如果池子大小撑到了maximumPoolSize,就用RejectedExecutionHandler来做拒绝处理
* 当池子的线程数大于corePoolSize时,多余的线程会等待keepAliveTime长时间,如果无请求可处理就自行销毁
* 其会优先创建corePoolSize线程,当继续增加线程时,先放入Queue中,当 CorePoolSiz 和 Queue 都满的时候,就增加创建新线程,当线程达到MaxPoolSize的时候,就会抛出错
* 另外MaxPoolSize的设定如果比系统支持的线程数还要大时,会抛出java.lang.OutOfMemoryError: unable to create new native thread 异常。
*
* @author liangzhaolin
* @date 2020/7/24 14:43
* @see "https://blog.csdn.net/csdn_pangxiong/article/details/103731613"
* @see "https://www.jianshu.com/p/14bde4e6f747"
*/
@Bean("syncOneNetEleTypeExecutor")
public Executor syncOneNetEleTypeExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 核心线程数:线程池创建时候初始化的线程数
executor.setCorePoolSize(30);
// 最大线程数:线程池最大的线程数,只有在缓冲队列满了之后才会申请超过核心线程数的线程
executor.setMaxPoolSize(300);
// 缓冲队列:用来缓冲执行任务的队列
executor.setQueueCapacity(600);
// 允许线程的空闲时间60秒:当超过了核心线程之外的线程在空闲时间到达之后会被销毁
executor.setKeepAliveSeconds(60);
// 线程池名的前缀:设置好了之后可以方便我们定位处理任务所在的线程池
executor.setThreadNamePrefix("SyncOneNetEleTypeThread-");
// 缓冲队列满了之后的拒绝策略:由调用线程处理(一般是主线程),这里是DiscardPolicy丢弃策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
executor.initialize();
return executor;
}
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)