JAVA多线程执行,等待返回结果,再执行

2023-11-20

JAVA多线程执行,等待返回结果,再执行

1.实现callable接口

1)配置线程池
package com.neusoft.demo.server.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.ThreadPoolExecutor;

/**
 * 线程池配置、启用异步
 * 
 * @author
 *
 */
@EnableAsync(proxyTargetClass = true)
@Configuration
public class AsycTaskExecutorConfig {

	@Bean
	public TaskExecutor taskExecutor() {
		ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        // 核心线程数
		taskExecutor.setCorePoolSize(50);
		//最大线程数
		taskExecutor.setMaxPoolSize(100);
		//最大队列数
        taskExecutor.setQueueCapacity(1000);
        // 线程的空闲时间
        taskExecutor.setKeepAliveSeconds(100);
        //线程前缀
        //taskExecutor.setThreadNamePrefix("asyncTaskExecutor-");
        //拒绝策略交给主线程执行
        taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
		return taskExecutor;
	}
}

2)编写线程实现callable接口
package com.neusoft.demo.server.thread;

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.neusoft.demo.server.model.TrafficFlow;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.index.query.TermsQueryBuilder;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.Callable;

/**
 * @author dume
 * @create 2021-10-26 9:44
 **/
@Scope("prototype")
@Configuration
public class ThreadTaskProcess implements Callable {
    private static final Logger log = LoggerFactory.getLogger("adminLogger");
    @Qualifier("MyHighLevelClient")
    @Autowired
    private RestHighLevelClient rhlClient;
    @Value("${elasticsearch.motor-vehicle-node-name}")
    private String motorVehicleNode;

    private int searchnum;
    private String EndTime;
    private String BeginTime;
    private String Precision;
    private String DeviceCityCode;
    private String DeviceID;
    private String PlateColor;
    private String Direction;


    public ThreadTaskProcess getInstance(String EndTime, String BeginTime, String Precision, String DeviceCityCode, String DeviceID, String PlateColor, String Direction,int searchnum) {
        this.EndTime = EndTime;
        this.BeginTime = BeginTime;
        this.Precision = Precision;
        this.DeviceCityCode = DeviceCityCode;
        this.DeviceID = DeviceID;
        this.PlateColor = PlateColor;
        this.Direction = Direction;
        this.searchnum = searchnum;
        return this;
    }
    @Override
    public Object call() throws Exception {
        //es请求
        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
        BoolQueryBuilder bqb = QueryBuilders.boolQuery();
        if (BeginTime != null && EndTime != null) {
            RangeQueryBuilder rqb = QueryBuilders.rangeQuery("PassTime");
            rqb.gte(BeginTime);
            rqb.lt(EndTime);
            rqb.format("yyyyMMddHHmmss");
            bqb.must(rqb);
        }
        if(StringUtils.isNotBlank(DeviceCityCode)){
            if(DeviceCityCode.contains(",")){
                String[] strings = DeviceCityCode.split(",");
                TermsQueryBuilder tqb = QueryBuilders.termsQuery("DeviceCityCode",strings);
                bqb.must(tqb);
            }else{
                TermQueryBuilder tqb = QueryBuilders.termQuery("DeviceCityCode",DeviceCityCode);
                bqb.must(tqb);
            }

        }
        if(StringUtils.isNotBlank(DeviceID)){
            TermQueryBuilder tqb = QueryBuilders.termQuery("DeviceID",DeviceID);
            bqb.must(tqb);
        }
        if(StringUtils.isNotBlank(PlateColor)){

            if(PlateColor.length()==1){
                TermQueryBuilder tqb = QueryBuilders.termQuery("PlateColor",PlateColor);
                bqb.must(tqb);
            }else{
                TermQueryBuilder tqb = QueryBuilders.termQuery("PlateColor","6");
                bqb.mustNot(tqb);
            }
        }
        if(StringUtils.isNotBlank(Direction)){
            TermQueryBuilder tqb = QueryBuilders.termQuery("Direction",Direction);
            bqb.must(tqb);
        }

        sourceBuilder.query(bqb);
        sourceBuilder.size(0);
        sourceBuilder.timeout(new TimeValue(600000));
        DateHistogramAggregationBuilder fieldBuilder = AggregationBuilders
                .dateHistogram("articles_over_time")
                .field("PassTime")
                .dateHistogramInterval(new DateHistogramInterval(Precision+"m"))
                .format("yyyy/MM/dd HH:mm")
                .order(BucketOrder.key(true));
        SearchRequest searchRequest = new SearchRequest(motorVehicleNode).source(sourceBuilder.aggregation(fieldBuilder));
        SearchResponse searchResponse = null;
        JSONArray array = new JSONArray();
        JSONArray arrayback = new JSONArray();
        try {
            searchResponse = rhlClient.search(searchRequest, RequestOptions.DEFAULT);
            Histogram histo = searchResponse.getAggregations().get("articles_over_time");
            List<Histogram.Bucket> buckets = (List<Histogram.Bucket>)histo.getBuckets();
            array = JSONArray.parseArray(JSONArray.toJSONString(buckets));
            if(CollectionUtils.isNotEmpty(array)){
                for(int i=0;i<array.size();i++){
                    JSONObject object1 = new JSONObject();
                    object1.put("num",String.valueOf(i+1));
                    object1.put("datetime",array.getJSONObject(i).getString("keyAsString"));
                    object1.put("flownumber",array.getJSONObject(i).getString("docCount"));
                    arrayback.add(object1);
                }
            }
        } catch (IOException e1) {
            e1.printStackTrace();
        }

        TrafficFlow trafficFlow = new TrafficFlow();
        trafficFlow.setSearchnum(searchnum);
        if(CollectionUtils.isNotEmpty(arrayback)){
            List<TrafficFlow> list = JSONArray.parseArray(JSONArray.toJSONString(arrayback),TrafficFlow.class);
            trafficFlow.setTrafficFlows(list);
        }
        return trafficFlow;

    }




}

3)编写逻辑,线程执行完成后获取返回值
package com.neusoft.demo.server.service.impl;

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.neusoft.demo.server.model.BeginAndEndDate;
import com.neusoft.demo.server.model.TrafficFlow;
import com.neusoft.demo.server.service.VbdTrafficFlowService;
import com.neusoft.demo.server.thread.ThreadTaskProcess;
import com.neusoft.demo.server.utils.DateSplitUtils;
import com.neusoft.demo.server.utils.RedisUtil;
import org.apache.commons.collections4.CollectionUtils;
import org.elasticsearch.client.RestHighLevelClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.ObjectFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;

import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Date;
import java.util.List;
import java.util.concurrent.FutureTask;
import java.util.stream.Collectors;

/**
 * @author dume
 * @create 2021-10-20 17:54
 **/
@Service
public class VbdTrafficFlowServiceImpl implements VbdTrafficFlowService {
    private static final Logger log = LoggerFactory.getLogger("adminLogger");

    @Qualifier("MyHighLevelClient")
    @Autowired
    private RestHighLevelClient rhlClient;
    @Value("${elasticsearch.motor-vehicle-node-name}")
    private String motorVehicleNode;

    @Autowired
    protected ThreadPoolTaskExecutor executorService;

    @Autowired
    private ObjectFactory<ThreadTaskProcess> processFactory;

    private static SimpleDateFormat simpleDateFormatOne = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    private static SimpleDateFormat simpleDateFormatTwo = new SimpleDateFormat("yyyyMMddHHmmss");
    private static int  oneDayTimes = 24*60*60*1000;
    private static int fiveMinutes = 5*60*1000;
    private static int fifthyMinutes = 15*60*1000;
    private static int thirtyMinutes = 30*60*1000;
    private static int sixtyMinutes = 60*60*1000;

    @Override
    public JSONObject getTrafficFlow(String EndTime, String BeginTime, String Precision, String DeviceCityCode, String DeviceID, String PlateColor, String Direction) {
        JSONObject object = new JSONObject();
        List<TrafficFlow> trafficFlows = new ArrayList();
        /**
         * 时间段处理
         */
        int precisionMinutes =  getMinutes(Precision);
        Date start = new Date();
        Date end = new Date();
        List<BeginAndEndDate> beginAndEndDateList = new ArrayList<>();
        //时间分片数

        try{
            start = simpleDateFormatTwo.parse(BeginTime);
            end = simpleDateFormatTwo.parse(EndTime);
        }catch (Exception e){
            e.printStackTrace();
            log.error("时间转化失败",e.getMessage());
        }

        if(end.getTime()-start.getTime()<=oneDayTimes){

            BeginAndEndDate beginAndEndDate =  new BeginAndEndDate();
            beginAndEndDate.setBeginTime(BeginTime);
            beginAndEndDate.setEndTime(EndTime);
            beginAndEndDateList.add(beginAndEndDate);
        }else{
          //将时间转化为可以被精度整除的时间
            long startlong = start.getTime();
            long endlong = end.getTime();
            int precision = Integer.valueOf(Precision);
            precision = precision*60*1000;
            startlong = startlong%precision==0?startlong:startlong-(startlong%precision);
            endlong = endlong%precision==0?endlong:endlong+(precision-(endlong%precision));
            List<DateSplitUtils.DateSplit> dateSplits =
                    DateSplitUtils.splitDate(
                            new Date(startlong),
                            new Date(endlong),
                            DateSplitUtils.IntervalType.DAY,
                            1);
            for (DateSplitUtils.DateSplit dateSplit : dateSplits) {
                BeginAndEndDate beginAndEndDate =  new BeginAndEndDate();
                beginAndEndDate.setBeginTime(dateSplit.getStartDateTimeStr());
                beginAndEndDate.setEndTime(dateSplit.getEndDateTimeStr());
                beginAndEndDateList.add(beginAndEndDate);
            }
        }

        List<FutureTask<Object>> futureTasks = new ArrayList<FutureTask<Object>>();
        JSONObject data = new JSONObject();

        for(int i=0;i<beginAndEndDateList.size();i++){
            futureTasks.add(
                    new FutureTask<>(
                            processFactory.getObject().getInstance(
                                    beginAndEndDateList.get(i).getEndTime(),
                                    beginAndEndDateList.get(i).getBeginTime(),
                                    Precision,
                                    DeviceCityCode,
                                    DeviceID,
                                    PlateColor,
                                    Direction,
                                    i+1
                            )
                    )
            );
        }
        // 加入 线程池
        for (FutureTask<Object> futureTask : futureTasks) {
            executorService.submit(futureTask);
        }
        // 获取线程返回结果
        for (int i = 0; i < futureTasks.size(); i++) {
            try {
                TrafficFlow trafficFlow = (TrafficFlow) futureTasks.get(i).get();
                if(null!=trafficFlow){
                    trafficFlows.add(trafficFlow);
                }

            } catch (Exception e) {
                e.printStackTrace();
                log.error("多线程获取返回结果失败",e.getMessage());
            }
        }
        /**
         * 排序并去空
         */
        List<TrafficFlow> backTrafficFlows = new ArrayList();
        if(CollectionUtils.isNotEmpty(trafficFlows)){
            trafficFlows = trafficFlows
                    .stream()
                    .sorted(Comparator.comparing(TrafficFlow::getSearchnum))
                    .collect(Collectors.toList());
            for(TrafficFlow trafficFlow : trafficFlows){
                if(CollectionUtils.isNotEmpty(trafficFlow.getTrafficFlows())){
                    backTrafficFlows.addAll(trafficFlow.getTrafficFlows());
                }

            }
            int num =1;
            for(TrafficFlow trafficFlow : backTrafficFlows){
                trafficFlow.setNum(num);
                num++;
            }
        }
        data.put("dataList",backTrafficFlows);
        object.put("data",data);
        return object;
    }

    public int getMinutes( String Precision){
        int back;
        switch (Precision){
            case "5":
                back = fiveMinutes;
                break;
            case "15":
                back = fifthyMinutes;
                break;
            case "30":
                back = thirtyMinutes;
                break;
            case "60":
                back = sixtyMinutes;
                break;
            default:
                back = sixtyMinutes;
                break;

        }
        return back;
    }
}

2.实现Runnable, 使用计数器

// 创建线程
public class MyThread implements Runnable {
    private List<MyVo> myVos;
    private CountDownLatch latch;

    public MyThread(List<MyVo> myVos, CountDownLatch latch){
        this.myVos = myVos;
        this.latch = latch;
    }

    @Override
    public void run() {
        // 执行代码逻辑
		 /**
		 *写逻辑
		 */
	
	
		// 计数器计数
        latch.countDown();
    }
}



// 创建线程池
    ExecutorService executorService = new ThreadPoolExecutor(12, 24,
            60L, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>());


public void executorMothod(){

try {
	List<List<MyVo>> list = new ArrayList<>();
	// 创建计数器
    CountDownLatch latch = new CountDownLatch(list.size());
    for (List<MyVo> myVos : list) {
        executorService.submit(new MyThread(myVos, latch));
    }
    // 等待计数器执行完成再获取数据
    latch.await();
} catch (InterruptedException e) {
    log.error(e.getMessage());
}finally {
    if(executorService != null && !executorService.isShutdown()){
        executorService.shutdown();
    }
}
	
log.info("线程执行完毕!");


// 执行后续程序

}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

JAVA多线程执行,等待返回结果,再执行 的相关文章

随机推荐

  • 心理学的166个现象---之六

    101 拍球效应 拍篮球时 用的力越大 篮球就跳得越高 对学生的期望值越高 学生潜能的发挥就越充分 优秀的老师总是尽可能地信任学生 不断鼓励学生 而批评则尽可能委婉 不使矛盾激化 102 旁观者效应 1993年 四川达竹矿务局一名高考超过录
  • pytorch模型训练的若干问题

    1 Net input 调用的是什么函数 为什么直接写对象名就直接调用函数了 net是创建的vgg类的对象 vgg类继承于pytorch库中类nn Module 创建类时的括号里写上父类的名字 就是继承的意思 在pytorch库中nn Mo
  • QTableWidget 设置表头颜色

    QTableWidget 设置表头颜色 方法1 setStyleSheet QHeaderView section background color qlineargradient x1 0 y1 0 x2 0 y2 1 stop 0 00
  • android sdk自带的fragment标签使用

    项目开发中要用到 下面四个大分类 上面三个小分类的情况 大分类采用viewPage 小分类 使用了sdk自带的
  • 制造业软件体系结构与互联网的差异

    本人自毕业已经13年 虽然热爱计算机 但是由于种种原因 一直在东莞的工厂混迹 感受着互联网的大潮 也不免有几分失落 伴随这去年 今年大厂裁人 许多被逼无路的程序员开始跳槽制造业 浓浓的Java气息来了 在此不免吐槽一句 请不要把写互联网程序
  • ESP32-PICO-D4下载程序出现 rst:0x10 (RTCWDT_RTC_RESET),boot:0x13 (SPI_FAST_FLASH_BOOT) flash read err, 1000

    备注 是我自己记录用的 有问题可以交流 用的Visual Studio Code Arduino platformio开发 最近现在在搞物联网 发现ESP32这款芯片容易上手 而且功能强大 买的开发板用起来很顺手 于是我就自己从立创开源上找
  • 解决cannot be cast to class jakarta.servlet.Servlet问题

    我的Tomcat版本是10 0 5 这个问题的主要原因是因为 10版本的Tomcat的servlet包变化了 解决问题方法 IDEA选择这个直接完美解决 IDEA选择这个直接完美解决 IDEA选择这个直接完美解决 1下载对应的包并且导入 下
  • Prim算法解决修路问题

    普里姆算法 Prim算法 图论中的一种算法 可在加权连通图里搜索最小生成树 意即由此算法搜索到的边子集所构成的树中 不但包括了连通图里的所有顶点 英语 Vertex graph theory 且其所有边的权值之和亦为最小 普里姆算法和Kru
  • storm集成kafka简单使用示例2

    StormKafkaTopo java package stormUse stormUse import java util Properties import org apache storm Config import org apac
  • 9.2 单片机上下拉电阻

    前边似乎我们很多次提到了上拉电阻 下拉电阻 具体到底什么样的电阻算是上下拉电阻 上下拉电阻都有何作用呢 上拉电阻就是将不确定的信号通过一个电阻拉到高电平 同时此电阻也起到一个限流作用 下拉就是下拉到低电平
  • app id(wildcard ID和explicit ID)

    2019独角兽企业重金招聘Python工程师标准 gt gt gt 最近做ios游戏的平台相关的工作 平台商要求把我们产品的bundle id加上他们的标记 比如我们的bundle id叫 com lc test 如果我上CSDN的平台 就
  • CleanMyMac X4.14.1苹果Mac电脑系统最好用的系统清理工具

    macOS 平台的知名系统清理应用 CleanMyMac 在经历了一段时间的beta测试后 全新设计的 CleanMyMac X 正式上线 与 CleanMyMac3相比 新版本的 UI 设计焕然一新 采用了完全不同的风格 除了设计上的变化
  • gdb attach 方法

    第一步 获得正在运行的进程的进程号 程序编译时要有 g参数 第二步 gdb attach 根据上一步获得进程号 现在attach上去 此处可stop暂停程序 第三步 打断点 gdb有两种打断点的方式 b 行号 如果是当前文件 则直接加上行号
  • 用wordpress编辑网站使页面中的图片全屏展示和全屏轮播展示

    在利用wordpress建立网站中 页面中的bannner图如何使其全屏展示以及如何添加轮播图 一 页面中的图片如何设置为全屏图片展示 操作步骤如下 1 打开网站的后台 点击 页面 选择所有页面 如图所示 2 选择相应的页面 点击 使用El
  • nacos简易实现负载均衡

    目录 一 什么是Nacos 二 Nacos下载和安装 1 使用Windows启动 2 验证nacos是否成功启动 三 Nacos Discovery服务注册 发现 四 简易实现负载均衡 1 注册者配置 2 注册者启动类 3 注册者业务层 4
  • 数组添加进formdata_FormData使用方法详解

    FormData的主要用途有两个 1 将form表单元素的name与value进行组合 实现表单数据的序列化 从而减少表单元素的拼接 提高工作效率 2 异步上传文件 一 创建formData对象 1 创建一个空对象 通过FormData构造
  • Linux命令_sort & 排序、去重

    目录 1 语法 1 1 常用参数 2 常见用法 2 1 按数值排序 2 2 按文件大小排序 2 3 指定某一列排序 2 4 去重后排序 2 5 生成随机数 2 6 同时查看多个文件 2 7 排序后的值写入文件 可直接修改文件 1 语法 so
  • 如何使用区块链技术保护个人隐私和数据安全

    区块链技术是一种分布式账本技术 它具有不可篡改 去中心化 透明度高等特点 区块链技术能够实现数据的可信存证 隐私保护和交易安全 并且能够通过智能合约的自动执行 因此被广泛应用于金融 电商 物流 社交网络等领域 区块链技术的核心是 分布式账本
  • Go语言List的使用与数据结构的选择

    container包下的函数 heap heap包提供了对任意类型 实现了heap Interface接口 的堆操作 list list包实现了双向链表 ring ring实现了环形链表的操作 一 List的使用 List列表是一种非连续存
  • JAVA多线程执行,等待返回结果,再执行

    JAVA多线程执行 等待返回结果 再执行 1 实现callable接口 1 配置线程池 package com neusoft demo server config import org springframework context an