赫拉(hera)分布式任务调度系统

2023-11-17

相关介绍

1.实现集群HA,机器宕机环境实现机器断线重连与心跳恢复与hera集群HA,节点单点故障环境下任务自动恢复,master断开,work抢占master

@Component
public class DistributeLock {


    @Autowired
    private HeraHostRelationService hostGroupService;
    @Autowired
    private HeraLockService heraLockService;

    @Autowired
    private WorkClient workClient;

    @Autowired
    private HeraSchedule heraSchedule;

    private final long timeout = 1000 * 60 * 5L;

    private final String ON_LINE = "online";

    @PostConstruct
    public void init() {

        workClient.workSchedule.scheduleAtFixedRate(() -> {
            try {
                checkLock();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }, 10, 60, TimeUnit.SECONDS);
    }

    public void checkLock() {
        HeraLock heraLock = heraLockService.findBySubgroup(ON_LINE);
        if (heraLock == null) {
            Date date = new Date();
            heraLock = HeraLock.builder()
                    .id(1)
                    .host(WorkContext.host)
                    .serverUpdate(date)
                    .subgroup(ON_LINE)
                    .gmtCreate(date)
                    .gmtModified(date)
                    .build();
            Integer lock = heraLockService.insert(heraLock);
            if (lock == null || lock <= 0) {
                return;
            }
        }

        if (WorkContext.host.equals(heraLock.getHost().trim())) {
            heraLock.setServerUpdate(new Date());
            heraLockService.update(heraLock);
            HeraLog.info("hold lock and update time");
            heraSchedule.startup();
        } else {
            long currentTime = System.currentTimeMillis();
            long lockTime = heraLock.getServerUpdate().getTime();
            long interval = currentTime - lockTime;
            if (interval > timeout && isPreemptionHost()) {
                Date date = new Date();
                Integer lock = heraLockService.changeLock(WorkContext.host, date, date, heraLock.getHost());
                if (lock != null && lock > 0) {
                    ErrorLog.error("master 发生切换,{} 抢占成功", WorkContext.host);
                    heraSchedule.startup();
                    heraLock.setHost(WorkContext.host);
                    //TODO  接入master切换通知
                } else {
                    HeraLog.info("master抢占失败,由其它worker抢占成功");
                }
            } else {
                //非主节点,调度器不执行
                heraSchedule.shutdown();
            }
        }
        workClient.init();
        try {
            workClient.connect(heraLock.getHost().trim());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 检测该ip是否具有抢占master的权限
     * @return 是/否
     */
    private boolean isPreemptionHost() {
        List<String> preemptionHostList = hostGroupService.findPreemptionGroup(HeraGlobalEnvironment.preemptionMasterGroup);
        if (preemptionHostList.contains(WorkContext.host)) {
            return true;
        } else {
            HeraLog.info(WorkContext.host + " is not in master group " + preemptionHostList.toString());
            return false;
        }
    }
}

1.通过PostConstruct在项目启动的时候触发定时任务。。

2.定时任务主要通过数库来进行强占master

3.如果是master,定时任务做的事情就是update  heraLock  ServerUpdate

4.如果是worker,定时任务做的事情就是查看master是不是长久没更新ServerUpdate,然后强占

5. heraSchedule.startup();workClient.init();workClient.connect(heraLock.getHost().trim()) 这三方法里面有状态,会置空操作。

2.masterServer与workClient启动及交互

masterServer的启动入口为,heraSchedule.startup()-->masterxContext.init()

 public void init() {
        threadPool = new ThreadPoolExecutor(
                0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), new NamedThreadFactory("master-wait-response"), new ThreadPoolExecutor.AbortPolicy());
        masterSchedule = new ScheduledThreadPoolExecutor(5, new NamedThreadFactory("master-schedule", false));
        masterSchedule.setKeepAliveTime(5, TimeUnit.MINUTES);
        masterSchedule.allowCoreThreadTimeOut(true);
        this.getQuartzSchedulerService().start();
        dispatcher = new Dispatcher();
        handler = new MasterHandler(this);//主handler
        masterServer = new MasterServer(handler);//主通讯类
        masterServer.start(HeraGlobalEnvironment.getConnectPort());//启动
        master.init(this);
        HeraLog.info("end init master content success ");
    }

而MasterSever主要关注MasterHandler

  public MasterHandler(MasterContext masterContext) {
        this.masterContext = masterContext;
        //把处理结果统一管理
        completionService = new ExecutorCompletionService<>(
                new ThreadPoolExecutor(
                        0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), new NamedThreadFactory("master-execute", false), new ThreadPoolExecutor.AbortPolicy()));
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                1, 1, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new NamedThreadFactory("master-deal", false), new ThreadPoolExecutor.AbortPolicy());
        executor.execute(() -> {
                    Future<ChannelResponse> future;
                    ChannelResponse response;
                    while (true) {//一量处理有结果,即时响应
                        try {
                            future = completionService.take();
                            response = future.get();
                            TaskLog.info("3-1.MasterHandler:-->master prepare send status : {}", response.webResponse.getStatus());
                            response.channel.writeAndFlush(wrapper(response.webResponse));
                            TaskLog.info("3-2.MasterHandler:-->master send response success, requestId={}", response.webResponse.getRid());
                        } catch (Exception e) {
                            ErrorLog.error("master handler future take error:{}", e);
                            e.printStackTrace();
                        } catch (Throwable throwable) {
                            ErrorLog.error("master handler future take throwable{}", throwable);
                            throwable.printStackTrace();
                        }
                    }
                }
        );
    }

 @Override
 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        SocketMessage socketMessage = (SocketMessage) msg;
        Channel channel = ctx.channel();
        switch (socketMessage.getKind()) {
            //心跳
            case REQUEST://如果是内置任务无需处理结果
                Request request = Request.newBuilder().mergeFrom(socketMessage.getBody()).build();
                switch (request.getOperate()) {
                    case HeartBeat:
                        masterContext.getThreadPool().execute(() -> MasterHandleRequest.handleHeartBeat(masterContext, channel, request));
                        break;
                    case SetWorkInfo:
                       //此处省略代码。。
                        break;
                }
                break;
            case WEB_REQUEST://web任务统处理,结果由专门线程返回
                final WebRequest webRequest = WebRequest.newBuilder().mergeFrom(socketMessage.getBody()).build();
                switch (webRequest.getOperate()) {
                    case ExecuteJob:
                      //此处省略代码。。。
                break;
            case RESPONSE://对于客户端内置请求响应结果交给监听器
                masterContext.getThreadPool().execute(() -> {
                    Response response = null;
                    try {
                        response = Response.newBuilder().mergeFrom(socketMessage.getBody()).build();

                        SocketLog.info("6.MasterHandler:receiver socket info from work {}, response is {}", ctx.channel().remoteAddress(), response.getRid());
                        for (ResponseListener listener : listeners) {
                            listener.onResponse(response);
                        }
                    } catch (InvalidProtocolBufferException e) {
                        e.printStackTrace();
                    }
                });

                break;
            case WEB_RESPONSE://对于客户端WEb请求响应结果交给监听器
                masterContext.getThreadPool().execute(() -> {
                    WebResponse webResponse = null;
                    try {
                        webResponse = WebResponse.newBuilder().mergeFrom(socketMessage.getBody()).build();
                    } catch (InvalidProtocolBufferException e) {
                        e.printStackTrace();
                    }
                    SocketLog.info("6.MasterHandler:receiver socket info from work {}, webResponse is {}", ctx.channel().remoteAddress(), webResponse.getRid());
                    for (ResponseListener listener : listeners) {
                        listener.onWebResponse(webResponse);
                    }
                });
                break;
            default:
                ErrorLog.error("unknown request type : {}", socketMessage.getKind());
                break;
        }
    }

//再看看监听器,任务里面有个ID号可以对应起结果
public class WorkResponseListener extends ResponseListenerAdapter {


    private RpcWebRequest.WebRequest request;
    private volatile Boolean receiveResult;
    private CountDownLatch latch;
    private RpcWebResponse.WebResponse webResponse;

    @Override
    public void onWebResponse(RpcWebResponse.WebResponse response) {
        if (request.getRid() == response.getRid()) {
            try {
                webResponse = response;
                receiveResult = true;
            } catch (Exception e) {
                ErrorLog.error("work release exception {}", e);
            } finally {
                latch.countDown();
            }
        }
    }
}

workclient的设计方式与masterServer设计方式如出一辙,具体查看WorkHandler类

  1. 把有需要结果的语法统一交给completionService,统一返回
  2. 针对响应结果采用监听器的方式,通知相关业务

3.支持master/work 负载,内存,进程,cpu信息的可视化查看

//workClient中init方法,定时上报workerHandlerHeartBeat 
workSchedule.schedule(new Runnable() {

            private WorkerHandlerHeartBeat workerHandlerHeartBeat = new WorkerHandlerHeartBeat();
            private int failCount = 0;

            @Override
            public void run() {
                try {
                    if (workContext.getServerChannel() != null) {
                        //这是主方法
                        boolean send = workerHandlerHeartBeat.send(workContext);
                        if (!send) {
                            failCount++;
                            ErrorLog.error("send heart beat failed ,failCount :" + failCount);
                        } else {
                            failCount = 0;
                            HeartLog.info("send heart beat success:{}", workContext.getServerChannel().getRemoteAddress());
                        }
                    } else {
                        ErrorLog.error("server channel can not find on " + WorkContext.host);
                    }
                } catch (Exception e) {
                    ErrorLog.error("heart beat error:", e);
                } finally {
                    workSchedule.schedule(this, (failCount + 1) * HeraGlobalEnvironment.getHeartBeat(), TimeUnit.SECONDS);
                }
            }
        }, HeraGlobalEnvironment.getHeartBeat(), TimeUnit.SECONDS);


//主要send方法
public boolean send(WorkContext context) {
        try {
            MemUseRateJob memUseRateJob = new MemUseRateJob(1);
            //内存信息获取
            memUseRateJob.readMemUsed();
            CpuLoadPerCoreJob loadPerCoreJob = new CpuLoadPerCoreJob();
            //负载信息获取
            loadPerCoreJob.run();
            //构建包体
            RpcHeartBeatMessage.HeartBeatMessage hbm = RpcHeartBeatMessage.HeartBeatMessage.newBuilder()
                    .setHost(WorkContext.host)
                    .setMemTotal(memUseRateJob.getMemTotal())
                    .setMemRate(memUseRateJob.getRate())
                    .setCpuLoadPerCore(loadPerCoreJob.getLoadPerCore())
                    .setTimestamp(System.currentTimeMillis())
                    .addAllDebugRunnings(context.getDebugRunning().keySet())
                    .addAllManualRunnings(context.getManualRunning().keySet())
                    .addAllRunnings(context.getRunning().keySet())
                    .setCores(WorkContext.cpuCoreNum)
                    .build();     context.getServerChannel().writeAndFlush(RpcSocketMessage.SocketMessage.newBuilder().
                    setKind(RpcSocketMessage.SocketMessage.Kind.REQUEST).
                    setBody(RpcRequest.Request.newBuilder().
                            setRid(AtomicIncrease.getAndIncrement()).
                            setOperate(RpcOperate.Operate.HeartBeat).
                            setBody(hbm.toByteString()).
                            build().toByteString()).
                    build());
        } catch (RemotingException e) {
            e.printStackTrace();
            return false;
        }
        return true;
    }

//master对于heartBeat的处理方式
 public static void handleHeartBeat(MasterContext masterContext, Channel channel, Request request) {
        //放到master服务器的内存中,等待WEB应用的调用 
        MasterWorkHolder workHolder = masterContext.getWorkMap().get(channel);
        HeartBeatInfo heartBeatInfo = new HeartBeatInfo();
        HeartBeatMessage heartBeatMessage;
        try {
            heartBeatMessage = HeartBeatMessage.newBuilder().mergeFrom(request.getBody()).build();
            heartBeatInfo.setHost(heartBeatMessage.getHost());
            heartBeatInfo.setMemRate(heartBeatMessage.getMemRate());
            heartBeatInfo.setMemTotal(heartBeatMessage.getMemTotal());
            heartBeatInfo.setCpuLoadPerCore(heartBeatMessage.getCpuLoadPerCore());
            heartBeatInfo.setRunning(heartBeatMessage.getRunningsList());
            heartBeatInfo.setDebugRunning(heartBeatMessage.getDebugRunningsList());
            heartBeatInfo.setManualRunning(heartBeatMessage.getManualRunningsList());
            heartBeatInfo.setTimestamp(heartBeatMessage.getTimestamp());
            heartBeatInfo.setCores(heartBeatMessage.getCores());
            workHolder.setHeartBeatInfo(heartBeatInfo);
            HeartLog.info("received heart beat from {} : {}", heartBeatMessage.getHost(), JSONObject.toJSONString(heartBeatInfo));
        } catch (InvalidProtocolBufferException e) {
            e.printStackTrace();
        }
    }

1.work启动时,启动定时任务,专们负责收集数据,并发送给master

2.master,收到信息存在自己内存中,等待web调用并返回

4.任务添加与执行过程

//研发中心立即执行
@RequestMapping(value = "/debugSelectCode", method = RequestMethod.POST)
    @ResponseBody
    public WebAsyncTask<JsonResponse> debugSelectCode(@RequestBody HeraFile heraFile) {

        String owner = getOwner();
        return new WebAsyncTask<JsonResponse>(HeraGlobalEnvironment.getRequestTimeout(), () -> {
            Map<String, Object> res = new HashMap<>(2);
            HeraFile file = heraFileService.findById(heraFile.getId());
            file.setContent(heraFile.getContent());
            String name = file.getName();
            String runType;
            HeraDebugHistory history = HeraDebugHistory.builder()
                    .fileId(file.getId())
                    .script(heraFile.getContent())
                    .startTime(new Date())
                    .owner(owner)
                    .hostGroupId(file.getHostGroupId() == 0 ? HeraGlobalEnvironment.defaultWorkerGroup : file.getHostGroupId())
                    .build();

            int suffixIndex = name.lastIndexOf(Constants.POINT);
            if (suffixIndex == -1) {
                return new JsonResponse(false, "无后缀名,请设置支持的后缀名[.sh .hive .spark]");
            }
            String suffix = name.substring(suffixIndex);
            //只支持shell,hive,spark  
            if ((Constants.HIVE_SUFFIX).equalsIgnoreCase(suffix)) {
                runType = Constants.HIVE_FILE;
            } else if ((Constants.SHELL_SUFFIX).equalsIgnoreCase(suffix)) {
                runType = Constants.SHELL_FILE;
            } else if ((Constants.SPARK_SUFFIX).equalsIgnoreCase(suffix)) {
                runType = Constants.SPARK_FILE;
            } else {
                return new JsonResponse(false, "暂未支持的后缀名[" + suffix + "],请设置支持的后缀名[.sh .hive .spark]");
            }
            history.setRunType(runType);
            String newId = debugHistoryService.insert(history);
            //向master提交任务 
            workClient.executeJobFromWeb(JobExecuteKind.ExecuteKind.DebugKind, newId);
            res.put("fileId", file.getId());
            res.put("debugId", newId);
            return new JsonResponse(true, "执行成功", res);
        });
    }


//master端对此任务的相关处理,则是加入DebugQueue,进入等待队列
 public void debug(HeraDebugHistoryVo debugHistory) {
        JobElement element = JobElement.builder()
                .jobId(debugHistory.getId())
                .hostGroupId(debugHistory.getHostGroupId())
                .build();
        debugHistory.setStatus(StatusEnum.RUNNING);
        debugHistory.setStartTime(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
        debugHistory.getLog().append(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 进入任务队列");
        masterContext.getHeraDebugHistoryService().update(BeanConvertUtils.convert(debugHistory));
        try {
            masterContext.getDebugQueue().put(element);
        } catch (InterruptedException e) {
            ErrorLog.error("添加开发中心执行任务失败:" + element.getJobId(), e);
        }
    }


    /**
     * 在master.init的时候会,会启动定时任务
     * 扫描任务等待队列,取出任务去执行
     * 对于没有可运行机器的时,manual,debug任务重新offer到原队列
     */
    public boolean scan() throws InterruptedException {
        boolean hasTask = false;
        if (!masterContext.getScheduleQueue().isEmpty()) {
            代码省略
        }

        if (!masterContext.getManualQueue().isEmpty()) {
            //代码省略
        }

        if (!masterContext.getDebugQueue().isEmpty()) {
            //拿出任务
            JobElement jobElement = masterContext.getDebugQueue().take();
            MasterWorkHolder selectWork = getRunnableWork(jobElement);
            if (selectWork == null) {
                masterContext.getDebugQueue().put(jobElement);
                ScheduleLog.warn("can not get work to execute DebugQueue job in master,job is:{}", jobElement.toString());
            } else {
                //当有机器的时候,直接给机子分配任务(向work发送任务)
                runDebugJob(selectWork, jobElement.getJobId());
                hasTask = true;
            }

        }
        return hasTask;

    }


 /**
     * worker接收到命令,JobUtils.createDebugJob创建job文件到服务器,拼接shell,并调用命令执行
     */
    private Future<RpcResponse.Response> debug(WorkContext workContext, RpcRequest.Request request) {
        RpcDebugMessage.DebugMessage debugMessage = null;
        try {
            debugMessage = RpcDebugMessage.DebugMessage.newBuilder().mergeFrom(request.getBody()).build();
        } catch (InvalidProtocolBufferException e) {
            e.printStackTrace();
        }
        String debugId = debugMessage.getDebugId();
        HeraDebugHistoryVo history = workContext.getHeraDebugHistoryService().findById(Integer.parseInt(debugId));
        return workContext.getWorkExecuteThreadPool().submit(() -> {
            int exitCode = -1;
            Exception exception = null;
            ResponseStatus.Status status;
            try {

                history.setExecuteHost(WorkContext.host);
                workContext.getHeraDebugHistoryService().update(BeanConvertUtils.convert(history));

                String date = new SimpleDateFormat("yyyy-MM-dd").format(new Date());
                File directory = new File(HeraGlobalEnvironment.getWorkDir() + File.separator + date + File.separator + "debug-" + debugId);
                if (!directory.exists()) {
                    if (directory.mkdirs()) {
                        HeraLog.error("创建文件失败:" + directory.getAbsolutePath());
                    }
                }
                Job job = JobUtils.createDebugJob(new JobContext(JobContext.DEBUG_RUN), BeanConvertUtils.convert(history),
                        directory.getAbsolutePath(), workContext);
                workContext.getDebugRunning().putIfAbsent(debugId, job);
                exitCode = job.run();
            } catch (Exception e) {
                exception = e;
                history.getLog().appendHeraException(e);
            } finally {
                HeraDebugHistoryVo heraDebugHistoryVo = workContext.getHeraDebugHistoryService().findById(Integer.parseInt(debugId));
                heraDebugHistoryVo.setEndTime(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));

                StatusEnum statusEnum = getStatusFromCode(exitCode);
                if (exitCode == 0) {
                    status = ResponseStatus.Status.OK;
                    heraDebugHistoryVo.setStatus(statusEnum);
                } else {
                    status = ResponseStatus.Status.ERROR;
                    heraDebugHistoryVo.setStatus(statusEnum);
                }
                workContext.getHeraDebugHistoryService().updateStatus(BeanConvertUtils.convert(heraDebugHistoryVo));
                HeraDebugHistoryVo debugHistory = workContext.getDebugRunning().get(debugId).getJobContext().getDebugHistory();
                workContext.getHeraDebugHistoryService().updateLog(BeanConvertUtils.convert(debugHistory));
                workContext.getDebugRunning().remove(debugId);
            }
            String errorText = "";
            if (exception != null && exception.getMessage() != null) {
                errorText = exception.getMessage();
            }
            return RpcResponse.Response.newBuilder()
                    .setRid(request.getRid())
                    .setOperate(RpcOperate.Operate.Debug)
                    .setStatusEnum(status)
                    .setErrorText(errorText)
                    .build();
        });
    }
  1. web把任务请求统一提交到master,master加到队列中
  2. master的后台进程去队列中拿任务
  3. 用scan选择work
  4. work接收到任务,执行对应脚本job.

5.服务降级,负载均衡

 /**
     * 获取hostGroupId中可以分发任务的worker
     */
    private MasterWorkHolder getRunnableWork(JobElement jobElement) {
        //根据算法选择对应的work
        MasterWorkHolder selectWork = loadBalance.select(jobElement, masterContext);
        if (selectWork == null) {
            return null;
        }
        Channel channel = selectWork.getChannel().getChannel();
        HeartBeatInfo beatInfo = selectWork.getHeartBeatInfo();
        // 如果最近两次选择的work一致  需要等待机器最新状态发来之后(睡眠)再进行任务分发
        if (HeraGlobalEnvironment.getWarmUpCheck() > 0 && lastWork != null && channel == lastWork && (beatInfo.getCpuLoadPerCore() > 0.6F || beatInfo.getMemRate() > 0.7F)) {
            ScheduleLog.info("达到预热条件,睡眠" + HeraGlobalEnvironment.getWarmUpCheck() + "秒");
            try {
                TimeUnit.SECONDS.sleep(HeraGlobalEnvironment.getWarmUpCheck());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            lastWork = null;
            return null;
        }
        lastWork = channel;
        return selectWork;
    }
  1. 先去数据库(缓存)查询任务对应分组机子
  2. 与在线机子做差集
  3. 在master为任务选择work的时候,项目分别采用了轮询及随机算法来实现

6.支持日志的实时滚动

 /**
         * 定时 刷新日志到数据库
         */
        workSchedule.scheduleWithFixedDelay(new Runnable() {
          
            @Override
            public void run() {
                try {
                    for (Job job : workContext.getRunning().values()) {
                        //处理日志
                    }

                    for (Job job : workContext.getManualRunning().values()) {
                     //处理日志
                    }

                    for (Job job : workContext.getDebugRunning().values()) {
                        try {
                             //服务在运行中会向这类不断导入日志
                            HeraDebugHistoryVo history = job.getJobContext().getDebugHistory();
                            workContext.getHeraDebugHistoryService().updateLog(BeanConvertUtils.convert(history));
                        } catch (Exception e) {
                            printDebugLog(job, e);
                        }
                    }
                } catch (Exception e) {
                    ErrorLog.error("job log flush exception:{}", e.toString());
                }

            }
        }, 0, 5, TimeUnit.SECONDS);
    }

1.脚本在运行中的时候,work都会为每个脚本维护一个HeraJobHistoryVo,其中  LogContent log属性会和服务输入输出流对应

2.work启动的时候,会启动一个定时刷新日志在数据库的进程

3.web端通个查询数据库很好的达到时实

7.hera事件模型dispatch

  /**
     * 事件广播,每次任务状态变化,触发响应事件,全局广播,自动调度successEvent,触发依赖调度一些依赖更新
     *
     * @param applicationEvent
     */
    public void dispatch(ApplicationEvent applicationEvent) {
        try {
            //封装事件
            MvcEvent mvcEvent = new MvcEvent(this, applicationEvent);
            mvcEvent.setApplicationEvent(applicationEvent);
            //前置监听者
            if (fireEvent(beforeDispatch, mvcEvent)) {
                List<AbstractHandler> jobHandlersCopy = Lists.newArrayList(jobHandlers);
                for (AbstractHandler jobHandler : jobHandlersCopy) {
                    //是否可执行
                    if (jobHandler.canHandle(applicationEvent)) {
                        if (!jobHandler.isInitialized()) {
                            jobHandler.setInitialized(true);
                        }
                        //job触发
                        jobHandler.handleEvent(applicationEvent);
                    }
                }
                //后置监听者
                fireEvent(afterDispatch, mvcEvent);
            }
        } catch (Exception e) {
            ErrorLog.error("global dispatch job event error", e);
        }

    }

1.dispatch对于事件的主要处理顺序  前置监听 ---- jobHandler  ----- 后置监听

8.支持关注自己的任务,自动调度执行失败时会向负责人发送邮件

 private void runDebugJob(MasterWorkHolder selectWork, String debugId) {
        final MasterWorkHolder workHolder = selectWork;
        this.executeJobPool.execute(() -> {
            HeraDebugHistoryVo history = masterContext.getHeraDebugHistoryService().findById(Integer.parseInt(debugId));
            history.getLog().append(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 开始运行");
            masterContext.getHeraDebugHistoryService().update(BeanConvertUtils.convert(history));
            Exception exception = null;
            RpcResponse.Response response = null;
            Future<RpcResponse.Response> future = null;
            try {
                future = new MasterExecuteJob().executeJob(masterContext, workHolder, JobExecuteKind.ExecuteKind.DebugKind, debugId);
                response = future.get(HeraGlobalEnvironment.getTaskTimeout(), TimeUnit.HOURS);
            } catch (Exception e) {
                exception = e;
                if (future != null) {
                    future.cancel(true);
                }
                DebugLog.error(String.format("debugId:%s run failed", debugId), e);
            }
            boolean success = response != null && response.getStatusEnum() == ResponseStatus.Status.OK;
            if (!success) {
                exception = new HeraException(String.format("fileId:%s run failed ", history.getFileId()), exception);
                TaskLog.info("8.Master: debug job error");
                history = masterContext.getHeraDebugHistoryService().findById(Integer.parseInt(debugId));
                //创建失败事件
                HeraDebugFailEvent failEvent = HeraDebugFailEvent.builder()
                        .debugHistory(BeanConvertUtils.convert(history))
                        .throwable(exception)
                        .fileId(history.getFileId())
                        .build();
                //将失败事件通知关心的人
                masterContext.getDispatcher().forwardEvent(failEvent);
            } else {
                TaskLog.info("7.Master: debug success");
                 //创建成功事件
                HeraDebugSuccessEvent successEvent = HeraDebugSuccessEvent.builder()
                        .fileId(history.getFileId())
                        .history(BeanConvertUtils.convert(history))
                        .build();
                //将成功事件通知关心的人
                masterContext.getDispatcher().forwardEvent(successEvent);
            }
        });
    }


public void alarm(HeraJobFailedEvent failedEvent) {
        String actionId = failedEvent.getActionId();
        Integer jobId = ActionUtil.getJobId(actionId);
        if (jobId == null) {
            return;
        }
        HeraJob heraJob = heraJobService.findById(jobId);
        // 自己建立的任务运行失败必须收到告警
        if (heraJob.getAuto() != 1 && !Constants.PUB_ENV.equals(HeraGlobalEnvironment.getEnv())) {
            return;
        }
        StringBuilder address = new StringBuilder();
        HeraUser user = heraUserService.findByName(heraJob.getOwner());
        address.append(user.getEmail().trim()).append(Constants.SEMICOLON);
        try {
            HeraJobMonitor monitor = heraJobMonitorService.findByJobIdWithOutBlank(heraJob.getId());
            if (monitor == null && Constants.PUB_ENV.equals(HeraGlobalEnvironment.getEnv())) {
                ScheduleLog.info("任务无监控人,发送给owner:{}", heraJob.getId());

            } else if (monitor != null) {
                String ids = monitor.getUserIds();
                String[] id = ids.split(Constants.COMMA);
                for (String anId : id) {
                    if (StringUtils.isBlank(anId)) {
                        continue;
                    }
                    HeraUser monitor_user = heraUserService.findById(Integer.parseInt(anId));
                    if (monitor_user != null && monitor_user.getEmail() != null) {
                        address.append(monitor_user.getEmail().trim()).append(Constants.SEMICOLON);
                    }
                }
            }

            String title = "hera调度任务失败[任务=" + heraJob.getName() + "(" + heraJob.getId() + "),版本号=" + actionId + "]";
            String content = "任务ID:" + heraJob.getId() + Constants.HTML_NEW_LINE
                    + "任务名:" + heraJob.getName() + Constants.HTML_NEW_LINE
                    + "任务版本号:" + actionId + Constants.HTML_NEW_LINE
                    + "任务描述:" + heraJob.getDescription() + Constants.HTML_NEW_LINE
                    + "任务OWNER:" + heraJob.getOwner() + Constants.HTML_NEW_LINE;

            String errorMsg = failedEvent.getHeraJobHistory().getLog().getMailContent();
            if (errorMsg != null) {
                content += Constants.HTML_NEW_LINE + Constants.HTML_NEW_LINE + "--------------------------------------------" + Constants.HTML_NEW_LINE + errorMsg;
            }
            emailService.sendEmail(title, content, address.toString());
        } catch (MessagingException e) {
            e.printStackTrace();
            ErrorLog.error("发送邮件失败");
        }

1.在master.init中  有 masterContext.getDispatcher().addDispatcherListener(new HeraJobFailListener());这一代码。

     他关注job失败的事件,当master端收的失败响应,则会触发事件

2.任务失败事件的最终响应者是 EmailJobFailAlarm,发邮件通知责任人

9.生成版本

//isSingle是否全量生成版本
//jobId 如果是单个,其任务ID
private boolean generateAction(boolean isSingle, Integer jobId) {
        try {
            //防止全量重复调用, 
            if (isGenerateActioning) {
                return true;
            }
            DateTime dateTime = new DateTime();
            Date now = dateTime.toDate();
            int executeHour = dateTime.getHourOfDay();
            //凌晨生成版本,早上七点以后开始再次生成版本
            boolean execute = executeHour == 0 || (executeHour > ActionUtil.ACTION_CREATE_MIN_HOUR && executeHour <= ActionUtil.ACTION_CREATE_MAX_HOUR);
            if (execute || isSingle) {
                String currString = ActionUtil.getCurrHourVersion();
                if (executeHour == ActionUtil.ACTION_CREATE_MAX_HOUR) {
                    Tuple<String, Date> nextDayString = ActionUtil.getNextDayString();
                    //例如:今天 2018.07.17 23:50  currString = 201807180000000000 now = 2018.07.18 23:50
                    currString = nextDayString.getSource();
                    now = nextDayString.getTarget();
                }
                Long nowAction = Long.parseLong(currString);
                ConcurrentHashMap<Long, HeraAction> actionMap = new ConcurrentHashMap<>(heraActionMap.size());
                List<HeraJob> jobList = new ArrayList<>();
                //批量生成
                if (!isSingle) {
                    isGenerateActioning = true;
                    jobList = masterContext.getHeraJobService().getAll();
                } else { //单个任务生成版本
                    HeraJob heraJob = masterContext.getHeraJobService().findById(jobId);
                    jobList.add(heraJob);
                    actionMap = heraActionMap;
                    List<Long> shouldRemove = new ArrayList<>();
                    for (Long actionId : actionMap.keySet()) {
                        if (StringUtil.actionIdToJobId(String.valueOf(actionId), String.valueOf(jobId))) {
                            shouldRemove.add(actionId);
                        }
                    }
                    //移除内存所有依赖这个Job的老版本
                    shouldRemove.forEach(actionMap::remove);
                    List<AbstractHandler> handlers = new ArrayList<>(masterContext.getDispatcher().getJobHandlers());
                    移除内存所有依赖这个Job的老版本任务
                    if (handlers != null && handlers.size() > 0) {
                        for (AbstractHandler handler : handlers) {
                            JobHandler jobHandler = (JobHandler) handler;
                            if (StringUtil.actionIdToJobId(jobHandler.getActionId(), String.valueOf(jobId))) {
                                masterContext.getQuartzSchedulerService().deleteJob(jobHandler.getActionId());
                                masterContext.getDispatcher().removeJobHandler(jobHandler);
                            }
                        }
                    }
                }
                String cronDate = ActionUtil.getActionVersionPrefix(now);
                Map<Integer, List<HeraAction>> idMap = new HashMap<>(jobList.size());
                Map<Integer, HeraJob> jobMap = new HashMap<>(jobList.size());
                //生成新的版本,并且注入actionMap
                generateScheduleJobAction(jobList, cronDate, actionMap, nowAction, idMap, jobMap);
                //整理新的依赖关系  
                for (Map.Entry<Integer, HeraJob> entry : jobMap.entrySet()) {
                    generateDependJobAction(jobMap, entry.getValue(), actionMap, nowAction, idMap);
                }
                if (executeHour < ActionUtil.ACTION_CREATE_MAX_HOUR) {
                    heraActionMap = actionMap;
                }
                Dispatcher dispatcher = masterContext.getDispatcher();
                if (dispatcher != null) {
                    if (actionMap.size() > 0) {
                        for (Long id : actionMap.keySet()) {
                            dispatcher.addJobHandler(new JobHandler(id.toString(), masterContext.getMaster(), masterContext));
                            //新加入的版本发送事件通知
                            if (id >= Long.parseLong(currString)) {
                                dispatcher.forwardEvent(new HeraJobMaintenanceEvent(Events.UpdateActions, id.toString()));
                            }
                        }
                    }
                }
                ScheduleLog.info("[单个任务:{},任务id:{}]generate action success", isSingle, jobId);
                return true;
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            isGenerateActioning = false;
        }
        return false;
    }

  1. 把要生成版本号的JOB,从内存及调度中删除
  2. 生成新版本号JOB,整理依赖关系,加入内存到缓存中
  3. 把新版本的生成事件播放出去

9.支持任务的定时调度、依赖调度、手动调度、手动恢复

定时调度

       主要是根据cron表达式来解析该任务的执行时间,在达到触发时间时将该任务加入任务队列

/**
     * 自动任务的版本生成
     *
     * @param jobList   任务集合
     * @param cronDate  日期
     * @param actionMap actionMap集合
     * @param nowAction 生成版本时间的action
     * @param idMap     已经遍历过的idMap
     * @param jobMap    依赖任务map映射
     */
    public void generateScheduleJobAction(List<HeraJob> jobList, String cronDate, Map<Long, HeraAction> actionMap, Long nowAction, Map<Integer, List<HeraAction>> idMap, Map<Integer, HeraJob> jobMap) {
        List<HeraAction> insertActionList = new ArrayList<>();
        for (HeraJob heraJob : jobList) {
            if (heraJob.getScheduleType() != null) {
                if (heraJob.getScheduleType() == 1) {
                    jobMap.put(heraJob.getId(), heraJob);
                } else if (heraJob.getScheduleType() == 0) {
                    //如果是定时任务,会解析表达式,拆分出多个版本
                    String cron = heraJob.getCronExpression();
                    List<String> list = new ArrayList<>();
                    if (StringUtils.isNotBlank(cron)) {
                        boolean isCronExp = CronParse.Parser(cron, cronDate, list);
                        if (!isCronExp) {
                            ErrorLog.error("cron parse error,jobId={},cron = {}", heraJob.getId(), cron);
                            continue;
                        }
                        List<HeraAction> heraAction = createHeraAction(list, heraJob);
                        idMap.put(heraJob.getId(), heraAction);
                        insertActionList.addAll(heraAction);
                    }
                } else {
                    ErrorLog.error("任务{}未知的调度类型{}", heraJob.getId(), heraJob.getScheduleType());
                }

            }
        }
        batchInsertList(insertActionList, actionMap, nowAction);

    }




//jobHandler中的Events.UpdateActions事件处理
    private void autoRecovery() {
        cache.refresh();
        HeraActionVo heraActionVo = cache.getHeraActionVo();
        //任务被删除
        if (heraActionVo == null) {
            masterContext.getDispatcher().removeJobHandler(this);
            destroy();
            ScheduleLog.info("heraAction 为空, 删除{}", actionId);
            return;
        }
        //自动调度关闭
        if (!heraActionVo.getAuto()) {
            destroy();
            return;
        }

        /**
         * 如果是依赖任务 原来可能是独立任务,需要尝试删除原来的定时调度
         * 如果是独立任务,则重新创建quartz调度
         *
         */
        if (heraActionVo.getScheduleType() == JobScheduleTypeEnum.Dependent) {
            destroy();
        } else if (heraActionVo.getScheduleType() == JobScheduleTypeEnum.Independent) {
            try {
                createScheduleJob(masterContext.getDispatcher(), heraActionVo);
            } catch (SchedulerException e) {
                e.printStackTrace();
            }
        }
    }


 /**
     * 创建定时任务
     *
     * @param dispatcher   the scheduler
     * @param heraActionVo the job name
     */

    private void createScheduleJob(Dispatcher dispatcher, HeraActionVo heraActionVo) throws SchedulerException {
        if (!ActionUtil.isCurrActionVersion(actionId)) {
            return;
        }
        JobKey jobKey = new JobKey(actionId, Constants.HERA_GROUP);
        if (masterContext.getQuartzSchedulerService().getScheduler().getJobDetail(jobKey) == null) {
            JobDetail jobDetail = JobBuilder.newJob(HeraQuartzJob.class).withIdentity(jobKey).build();
            jobDetail.getJobDataMap().put("actionId", heraActionVo.getId());
            jobDetail.getJobDataMap().put("dispatcher", dispatcher);
            //TODO  根据任务区域加时区
            CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(heraActionVo.getCronExpression().trim())/*.inTimeZone()*/;
            CronTrigger trigger = TriggerBuilder.newTrigger().withIdentity(actionId, Constants.HERA_GROUP).withSchedule(scheduleBuilder).build();
            masterContext.getQuartzSchedulerService().getScheduler().scheduleJob(jobDetail, trigger);
            ScheduleLog.info("--------------------------- 添加自动调度成功:{}--------------------------", heraActionVo.getId());
        }
    }
  1. 在生成版本号的时候根据cron表达式生成多个版本
  2. 发出多版本变更通知
  3. jobHandler处理给对应新版本创建对应定时任务

依赖调度

       我们的任务大部分都有依赖关系,只有在上一个任务计算出结果后才能进行下一步的执行。我们的依赖任务会在所有的依赖任务都执行完成之后才会被触发加入任务队列

/**
     * 递归生成任务依赖action
     *
     * @param jobMap    任务映射map
     * @param heraJob   当前生成版本的任务
     * @param actionMap 版本map
     * @param nowAction 生成版本时间的action
     * @param idMap     job的id集合  只要已经检测过的id都放入idSet中
     */
    private void generateDependJobAction(Map<Integer, HeraJob> jobMap, HeraJob heraJob, Map<Long, HeraAction> actionMap, Long nowAction, Map<Integer, List<HeraAction>> idMap) {
        if (heraJob == null || idMap.containsKey(heraJob.getId())) {
            return;
        }
        String jobDependencies = heraJob.getDependencies();
        if (StringUtils.isNotBlank(jobDependencies)) {
            Map<String, List<HeraAction>> dependenciesMap = new HashMap<>(1024);
            String[] dependencies = jobDependencies.split(Constants.COMMA);
            String actionMinDeps = "";
            boolean noAction = false;
            for (String dependentId : dependencies) {
                Integer dpId = Integer.parseInt(dependentId);
                //如果idSet不包含依赖任务dpId  则递归查找
                if (!idMap.containsKey(dpId)) {
                    generateDependJobAction(jobMap, jobMap.get(dpId), actionMap, nowAction, idMap);
                }
                List<HeraAction> dpActions = idMap.get(dpId);
                dependenciesMap.put(dependentId, dpActions);
                if (dpActions == null || dpActions.size() == 0) {
                    ErrorLog.warn("{}今天找不到版本,无法为任务{}生成版本", dependentId, heraJob.getId());
                    noAction = true;
                    break;
                }
                if (StringUtils.isBlank(actionMinDeps)) {
                    actionMinDeps = dependentId;
                }
                //找到所依赖的任务中版本最少的作为基准版本。
                if (dependenciesMap.get(actionMinDeps).size() > dependenciesMap.get(dependentId).size()) {
                    actionMinDeps = dependentId;
                } else if (dependenciesMap.get(dependentId).size() > 0 && dependenciesMap.get(actionMinDeps).size() == dependenciesMap.get(dependentId).size() &&
                        dependenciesMap.get(actionMinDeps).get(0).getId() < dependenciesMap.get(dependentId).get(0).getId()) {
                    //如果两个版本的个数一样  那么应该找一个时间较大的
                    actionMinDeps = dependentId;
                }
            }
            if (noAction) {
                idMap.put(heraJob.getId(), null);
            } else {
                List<HeraAction> actionMinList = dependenciesMap.get(actionMinDeps);
                if (actionMinList != null && actionMinList.size() > 0) {
                    List<HeraAction> insertList = new ArrayList<>();
                    for (HeraAction action : actionMinList) {
                        StringBuilder actionDependencies = new StringBuilder(action.getId().toString());
                        Long longActionId = Long.parseLong(actionDependencies.toString());
                        for (String dependency : dependencies) {
                            if (!dependency.equals(actionMinDeps)) {
                                List<HeraAction> otherAction = dependenciesMap.get(dependency);
                                if (otherAction == null || otherAction.size() == 0) {
                                    continue;
                                }
                                //找到一个离基准版本时间最近的action,添加为该任务的依赖
                                String otherActionId = otherAction.get(0).getId().toString();
                                for (HeraAction o : otherAction) {
                                    if (Math.abs(o.getId() - longActionId) < Math.abs(Long.parseLong(otherActionId) - longActionId)) {
                                        otherActionId = o.getId().toString();
                                    }
                                }
                                actionDependencies.append(",");
                                actionDependencies.append(Long.parseLong(otherActionId) / 1000000 * 1000000 + Long.parseLong(dependency));
                            }
                        }
                        HeraAction actionNew = new HeraAction();
                        BeanUtils.copyProperties(heraJob, actionNew);
                        Long actionId = longActionId / 1000000 * 1000000 + Long.parseLong(String.valueOf(heraJob.getId()));
                        actionNew.setId(actionId);
                        actionNew.setGmtCreate(new Date());
                        actionNew.setDependencies(actionDependencies.toString());
                        actionNew.setJobDependencies(heraJob.getDependencies());
                        actionNew.setJobId(heraJob.getId());
                        actionNew.setAuto(heraJob.getAuto());
                        actionNew.setHostGroupId(heraJob.getHostGroupId());
                        masterContext.getHeraJobActionService().insert(actionNew, nowAction);
                        actionMap.put(actionNew.getId(), actionNew);
                        insertList.add(actionNew);
                    }
                    idMap.put(heraJob.getId(), insertList);
                }
            }
        }

    }


  /**
     * 收到广播的任务成功事件的处理流程,每次自动调度任务成功执行,会进行一次全局的SuccessEvent广播,使得依赖任务可以更新readyDependent
     *
     * @param event
     */
    private void handleSuccessEvent(HeraJobSuccessEvent event) {
        if (event.getTriggerType() == TriggerTypeEnum.MANUAL) {
            return;
        }
        String jobId = event.getJobId();
        HeraActionVo heraActionVo = cache.getHeraActionVo();
        if (heraActionVo == null) {
            autoRecovery();
            return;
        }
        if (!heraActionVo.getAuto()) {
            return;
        }
        if (heraActionVo.getScheduleType() == JobScheduleTypeEnum.Independent) {
            return;
        }
        if (heraActionVo.getDependencies() == null || !heraActionVo.getDependencies().contains(jobId)) {
            return;
        }
        JobStatus jobStatus;
        //必须同步
        synchronized (this) {
            jobStatus = heraJobActionService.findJobStatus(actionId);
            ScheduleLog.info(actionId + "received a success dependency job with actionId = " + jobId);
            jobStatus.getReadyDependency().put(jobId, String.valueOf(System.currentTimeMillis()));
            heraJobActionService.updateStatus(jobStatus);
        }
        boolean allComplete = true;
        for (String key : heraActionVo.getDependencies()) {
            if (jobStatus.getReadyDependency().get(key) == null) {
                allComplete = false;
                break;
            }
        }
        if (allComplete) {
            ScheduleLog.info("JobId:" + actionId + " all dependency jobs is ready,run!");
            startNewJob(event.getTriggerType(), heraActionVo);
        } else {
            ScheduleLog.info(actionId + "some of dependency is not ready, waiting" + JSONObject.toJSONString(jobStatus.getReadyDependency().keySet()));
        }
    }
  1. generateDependJobAction算法比较复杂,其目的就是刷新下内存中 heraActionMap 中的依赖关系
  2.   当其依赖的独立任务完成时,jobHandler会收到消息,然后较验自己的所有依赖是否就绪然后发起任务命令

手动调度与手动恢复

        手动调度即为手动执行的任务,手动执行后自动加入任务队列,请注意,手动任务执行成功后不会通知下游任务(即:依赖于该任务的任务)该任务已经执行完成。

     手动恢复类似于手动调度,于手动调度的区别为此时如果该任务执行成功,会通知下游任务该任务已经执行完成

  /**
     * 收到广播的任务成功事件的处理流程,每次自动调度任务成功执行,会进行一次全局的SuccessEvent广播,使得依赖任务可以更新readyDependent
     *
     * @param event
     */
    private void handleSuccessEvent(HeraJobSuccessEvent event) {
        if (event.getTriggerType() == TriggerTypeEnum.MANUAL) {
            return;
        }
        //省略代码
    }

1.手动执行和debug执行流程相似,不做多介绍

2.Jobhandler中handleSuccessEvent对手动的进行了过滤

总结:

     关于抢占模式:

        基本上所有的调度工作都集中在master,通过权限控制可以很好的做个主备master,但备master会处于空闲。。

                 优化点:

                          master 处于单机运行模式,是否能提供集群方式运行

                                     如果集群关于work与master长链接维护的问题:

                                             如果不是和dubbo一样每个master和work建立长链接,就为有调用不了的问题。。

                                             如果work过多,对于master也是一种压力。。

                          master不建议设成work,是否可以改成 独占为master的work,不在接受工作调度

     关于机器选用:

            先去数据库(缓存)查询任务对应分组机子,与在线机子做差集。然后平均负载。因为缓存的关系,新加一台work要等待半小时。

            优化点:

                      作为work是不是应该自己所在的分组。。为什么不在启动的时候或者web设置的时候告诉work是属于什么分组的

    关于NIO通讯模型的使用

               hera的设计于master与work的通讯基本上采用 master发出请求,然后等待,work处理返回, master做后置处理,调用事件模型dispatch,做通知。

             优化点:

                   nio的特点是多路复用,netty框架的整个设计也是通知模式的。为什么采用同步等待+事件模型的方式。。 

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

赫拉(hera)分布式任务调度系统 的相关文章

  • String.format()方法使用详解技巧

    一 时间切割 String year String format tY 2020 04 30 2020 String month String format tm 2020 04 30 04 String day String format
  • [454]bokeh之bokeh.layouts

    如果希望在同一张图上显示多个图像 可以使用bokeh layouts类中的方法 row column gridplot widgetbox layout row row 的作用是将多个图像以行的方式放到同一张图中 from bokeh io

随机推荐

  • 出现Command ‘locate‘ not found,but can be installed with:apt install mlocate解决方法

    出现Command locate not found but can be installed with apt install mlocate解决方法 在使用Ubuntn出现Command locate not found but can
  • 以太坊区块链浏览器搭建

    链客 专为开发者而生 有问必答 此文章来自区块链技术社区 未经允许拒绝转载 当然 读者若要实践 那么电脑上必须已经搭建好了geth 并且命令 geth version 能显示版本信息 针对以太坊各个链 私链 公链 测试链 都可以用该篇文章来
  • Keil MDK误将Project窗口关了的解决办法,窗口视图重置

    在使用MDK时 误将Project窗口或者其他窗口关了 点击view 选择对应的窗口即可 或者点击Window窗口 选择Reset View to Defaults 再点击Reset即可实现窗口的重置
  • 项目管理:要做一项任务,不要做一堆事儿

    作为项目经理 我们最终的任务和目标是把项目高标准的完成 在完成最终目标的过程中离不开项目成员的协作配合和任务工作的分配 只有项目组成员各司其职 高效的完成各自的工作 才能保证项目的效率和质量 那么如何保证项目组成员在完成各自任务的时候既能保
  • 打包vue前端docker镜像

    1 安装好docker环境 docker v 查看是否成功 2编写Dockerfile文件 下面这个dokcerfile的RUN指令不好 当有多个命令需要执行的时候 可以用换行符和连接符隔开 而不是写多个RUN指令 因为那样会增加镜像的构建
  • Go基础(包、变量和函数):开启Go语言之旅

    开启Go语言之旅 Go编程语言是一个开源项目 可以让程序员提高工作效率 Go是富有表现力 简洁 干净和高效的 其并发机制使编写充分利用多核和联网机器的程序变得容易 而其新颖类型系统则可实现灵活的模块化程序构建 快速编译为机器代码 但具有垃圾
  • apple资讯

    6 月 10 日消息 macOS 12 Monterey 支持通过 AirPlay 将内容发送到第二台 Mac 上 用户还可以使用该功能进行屏幕镜像 将一台备用机器变成一个外部显示器 除了使用 AirPlay 无线方式外 macOS 12
  • MATLAB雷达空时自适应处理

    空时自适应处理是一个用来描述同时处理空域和时域的自适应阵列的术语 信号的空域分量由阵列传感器收集 与所有阵列工作相同 而信号的时域分量用每个阵列传感器后等间隔延时单元产生 为此目的 一个尺寸N阵列有N个子通道 每个传感器后面对应一个 在每个
  • Java Servlet生成Json格式数据

    2019独角兽企业重金招聘Python工程师标准 gt gt gt Java Servlet生成Json格式数据 分类 Web JAVA2013 09 17 14 38 4805人阅读 评论 1 收藏 举报 在Servlet中覆写doGet
  • java 提交表单_http常见的form表单请求方式

    在Web开发中 我们使用的比较多的HTTP请求方式基本上就是GET POST 一 http请求常见的表单文件上传形式 首先了解下application x www form urlencoded和multipart form data的区别
  • 渗透测试-木马免杀的几种方式

    前言 免杀 又叫免杀毒技术 是反病毒 反间谍的对立面 是一种能使病毒或木马免于被杀毒软件查杀的软件 它除了使病毒木马免于被查杀外 还可以扩增病毒木马的功能 改变病毒木马的行为 免杀的基本特征是破坏特征 有可能是行为特征 只要破坏了病毒与木马
  • Ubuntu18.04 编译安装llvm-clang

    背景知识 LLVM和GCC的区别 传统编译器 传统编译器的工作原理基本上都是三段式的 可以分为前端 Frontend 优化器 Optimizer 后端 Backend 前端负责解析源代码 检查语法错误 并将其翻译为抽象的语法树 Abstra
  • ASP.NET WebApi + Autofac 实现依赖注入

    一 项目情况 框架 NET Framework 4 5 Autofac 3 5 0 Autofac WebApi2 4 3 0 二 定义接口与对应实现 接口1 public interface IBaseUserService List
  • 用Jupyter完成numpy、pandas、matplotlib三个库的例题

    文章目录 实验环境 一 numpy例题 二 pandas例题 三 matplotlib例题 四 总结 实验环境 jupyter notebook 一 numpy例题 生成一个一维数组 起始值为5 终点值为15 样本数为10个 import
  • 【转载】使用jsoup替换HTML标记

    原始代码 String html font fsdfs font dfsdf font dasdasd font Document doc Jsoup parse html Elements elements doc select font
  • 【frida逆向开发】frida-rpc远程调用某安app方法获取token

    目录 一 使用fiddler对app进行抓包 二 反编译app定位关键代码 三 frida rpc调用相关方法 一 使用fiddler对app进行抓包 通过抓包可以看到请求参数中X App Token e8f1c71569a7166b6aa
  • AD隐藏元件标号

    1 右键点击元件 不是标号 2 查找相似对象 3 点击应用 4 再点确定 5 把Show Name右边的勾取消
  • Go 语言进阶(一) -- Go hertz http框架、kitex RPC微服务框架、gorm 数据库框架三件套用法详解

    Go 框架三件套 1 概论 Gorm Gorm 是一个已经迭代了10年 的功能强大的 ORM框架 在字节内部被广泛使用并且拥有非常丰富的开源扩展 Kitex Kitex 是字节内部 Golang 微服务 RPC 框架 具有高性能 强可扩展的
  • 必看的知识

    学习路线 必看的知识 Spring实战 Spring 4 x企业应用开发实战 深入分析Java Web技术内幕 修订版 Effective Java Thinking in Java Java核心技术 Core Java Thinking
  • 赫拉(hera)分布式任务调度系统

    相关介绍 赫拉 hera 分布式任务调度系统之架构 基本功能 一 赫拉 hera 分布式任务调度系统之项目启动 二 赫拉 hera 分布式任务调度系统之开发中心 三 赫拉 hera 分布式任务调度系统之版本 四 赫拉 hera 分布式任务调