ThreadPoolTaskExecutor
使用ThreadPoolTaskExecutor解决高并发性的问题
1、创建一个ThreadPoolTaskExecutor
2、往线程里面添加FutureTask对象,
3、然后等待FutureTask对象返回结果,根据结果进行显示处理的失败与成功
1、创建一个ThreadPoolTaskExecutor
<!-- spring thread pool executor -->
<bean id="dataSendExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<!-- 线程池维护线程的最少数量 -->
<property name="corePoolSize" value="20" />
<!-- 允许的空闲时间 -->
<property name="keepAliveSeconds" value="200" />
<!-- 线程池维护线程的最大数量 -->
<property name="maxPoolSize" value="30" />
<!-- 缓存队列 -->
<property name="queueCapacity" value="30" />
<!-- 对拒绝task的处理策略 -->
<property name="rejectedExecutionHandler">
<bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy" />
</property>
</bean>
@Resource
private ThreadPoolTaskExecutor dataSendExecutor;
2、往线程里面添加FutureTask对象,
package com.newsoft.datasend.service.impl;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
import com.newsoft.datasend.service.SendService;
import com.newsoft.datasend.thread.ThreadSplitCard;
import com.newsoft.datasend.thread.ThreadSplitCardNew;
import com.newsoft.datasend.util.ConfigProperties;
import com.newsoft.datasend.util.SplitCardDataUtil;
import com.newsoft.datasend.util.SplitCardNewDataUtil;
import com.newsoft.entity.Tscointerface;
import com.newsoft.enums.SystemEnum;
import com.newsoft.enums.Topic_To_Table;
import com.newsoft.mapper.TsUploadNotifyMapper;
/**
* ETC出口通行交易精确拆分结果
* @author 吴立碧
*
*/
@Service
public class SplitCardNewServiceImpl implements SendService {
private static Logger logger = LoggerFactory.getLogger(SplitCardNewServiceImpl.class);
@Resource
private TsUploadNotifyMapper dataSendMapper;
@Resource
private ThreadPoolTaskExecutor dataSendExecutor;
@Resource
private SplitCardNewDataUtil dataUtil;
@Override
public int doSend() throws Exception {
String topicName= Topic_To_Table.KA_CARDSPLIT_NEW.getTopicName();
Integer threadSize = Integer.valueOf(ConfigProperties.getConfigProperty("SENDTHREADSIZE"));
//Integer threadSize = 1;
int statuscode = 0;
Integer tableCode = Topic_To_Table.KA_CARDSPLIT_NEW.getTableCode();
List<Tscointerface> data = new ArrayList<Tscointerface>();
Tscointerface aTscointerface = new Tscointerface();
aTscointerface.setSendCode(SystemEnum.tb.getSystemNo());
aTscointerface.setSendName(SystemEnum.tb.getSystemCode());
aTscointerface.setInterfaceCode(tableCode);
data.add(aTscointerface);
Map<String, Object> map = new HashMap<String, Object>();
map.put("topicName", topicName);
List<FutureTask<Integer>> listFT = new ArrayList<>();
// 多线程用来解决并发
List<String> sendList = new ArrayList<String>();
for (Tscointerface po : data) {
sendList.add(po.getSendName());
}
// ConsumerUtil.initSplitCardKafkaConsumer(Topic_To_Table.KA_CARDSPLIT.getGroupName());
for (Tscointerface po : data) {
for (int i = 0; i < threadSize; i++) {
map.put("sendList", sendList);
map.put("tscf", po);
Map<String, Object> map1 = new HashMap<>(map);
ThreadSplitCardNew te = new ThreadSplitCardNew(dataUtil,dataSendMapper, logger, map1);
FutureTask<Integer> futureTask = new FutureTask<Integer>(te);
dataSendExecutor.execute(futureTask);
listFT.add(futureTask);
}
}
// 获取数据处理结果
for (int j = 0; j < listFT.size(); j++) {
try {
// 如果返回是异常,那么就将返回值置为1
if (listFT.get(j).get(1, TimeUnit.HOURS) == 1) {
statuscode = 1;
// flag = false;
}
} catch (InterruptedException e) {
logger.error("中断异常:" + e.getMessage());
// logger.error("异常堆栈:"+LogExceptionStackUtil.LogExceptionStack(e));
// XxlJobLogger.log(">>>>>>同步出错,信息为:{0}",LogExceptionStackUtil.LogExceptionStack(e));
statuscode = 1;
return statuscode;
} catch (ExecutionException e) {
logger.error("执行异常:" + e.getMessage());
// logger.error("异常堆栈:"+LogExceptionStackUtil.LogExceptionStack(e));
// XxlJobLogger.log(">>>>>>同步出错,信息为:{0}",LogExceptionStackUtil.LogExceptionStack(e));
statuscode = 1;
return statuscode;
} catch (TimeoutException e) {
logger.error("超时异常:" + e.getMessage());
// logger.error("异常堆栈:"+LogExceptionStackUtil.LogExceptionStack(e));
// XxlJobLogger.log(">>>>>>同步出错,信息为:{0}",LogExceptionStackUtil.LogExceptionStack(e));
statuscode = 1;
return statuscode;
}
}
return statuscode;
}
}
3、然后等待FutureTask对象返回结果,
package com.newsoft.datasend.thread;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Map;
import java.util.concurrent.Callable;
import org.slf4j.Logger;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import com.newsoft.datasend.util.SplitCardDataUtil;
import com.newsoft.datasend.util.SplitCardNewDataUtil;
import com.newsoft.mapper.TsUploadNotifyMapper;
/**
* ETC出口通行交易精确拆分结果
* @author 吴立碧
*
*/
public class ThreadSplitCardNew implements Callable<Integer> {
SplitCardNewDataUtil dataUtil;
TsUploadNotifyMapper dataSendMapper;
Logger logger;
Map<String, Object> map;
public ThreadSplitCardNew(SplitCardNewDataUtil dataUtil,TsUploadNotifyMapper dataSendMapper,
Logger logger, Map<String, Object> map) {
this.dataUtil = dataUtil;
this.dataSendMapper = dataSendMapper;
this.logger = logger;
this.map = map;
}
//FutureTask对象返回结果
@Override
@Transactional(value = "transactionManager_mapper", rollbackFor = Exception.class, propagation = Propagation.REQUIRED)
public Integer call() throws Exception {
try {
dataUtil.pressSendData(dataSendMapper, map);
} catch (Exception e) {
StringWriter errorsWriter = new StringWriter();
e.printStackTrace(new PrintWriter(errorsWriter));
logger.error(errorsWriter.toString());
return 1;
}
return 0;}
}
根据结果进行显示处理的失败与成功
// 获取数据处理结果
for (int j = 0; j < listFT.size(); j++) {
try {
// 如果返回是异常,那么就将返回值置为1
if (listFT.get(j).get(1, TimeUnit.HOURS) == 1) {
statuscode = 1;
// flag = false;
}
} catch (InterruptedException e) {
logger.error("中断异常:" + e.getMessage());
// logger.error("异常堆栈:"+LogExceptionStackUtil.LogExceptionStack(e));
// XxlJobLogger.log(">>>>>>同步出错,信息为:{0}",LogExceptionStackUtil.LogExceptionStack(e));
statuscode = 1;
return statuscode;
} catch (ExecutionException e) {
logger.error("执行异常:" + e.getMessage());
// logger.error("异常堆栈:"+LogExceptionStackUtil.LogExceptionStack(e));
// XxlJobLogger.log(">>>>>>同步出错,信息为:{0}",LogExceptionStackUtil.LogExceptionStack(e));
statuscode = 1;
return statuscode;
} catch (TimeoutException e) {
logger.error("超时异常:" + e.getMessage());
// logger.error("异常堆栈:"+LogExceptionStackUtil.LogExceptionStack(e));
// XxlJobLogger.log(">>>>>>同步出错,信息为:{0}",LogExceptionStackUtil.LogExceptionStack(e));
statuscode = 1;
return statuscode;
}