ScheduledThreadPoolExecutor周期定时任务异常处理踩坑的问题!!

2023-11-20

问题原因

在公司写项目的时候,有一个周期定时任务的需求,就想着阿里巴巴开发手册里不是说不能用Executors去创建线程池,因为存在如下问题:

  • FixedThreadPool和SingleThreadPool:允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM
  • CachedThreadPool:允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM

然后就没用Executors.newScheduledThreadPool(),然后自己new一个ScheduledThreadPoolExecutor对象,并重写了afterExecute方法,和自定义拒绝策略。

结果运行起来只执行一次就不打印日志了,这个问题困扰了我半天,所以留个笔记记录下来。
代码如下:

@Slf4j
@Component
public class PlanStartAndEndTask implements ApplicationRunner {

  /**
   * 初始化定时任务线程池
   */
  private final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1, new RecordExceptionExecutionHandler()) {

    /**
     * 自定义异常处理
     * @param runnable 任务
     * @param throwable 异常
     * @date 2021/3/31
     */
    @Override
    protected void afterExecute(Runnable runnable, Throwable throwable) {
      final Logger log = LoggerFactory.getLogger(this.getClass());
      if (runnable instanceof Thread) {
        if (throwable != null) {
          log.error("自动开始/结束分享计划的定时任务出现异常,时间:{},异常信息:{}", LocalDateTime.now(), throwable.getMessage());
        }
      } else if (runnable instanceof FutureTask) {
        FutureTask<?> futureTask = (FutureTask<?>) runnable;
        try {
          // 问题就出在这!!!
          futureTask.get();
        } catch (InterruptedException e) {
          log.error("自动开始/结束分享计划的定时任务被打断,时间:{}", LocalDateTime.now());
          Thread.currentThread().interrupt();
        } catch (ExecutionException e) {
          log.error("自动开始/结束分享计划的定时任务出现异常,时间:{},异常信息:{}", LocalDateTime.now(), e.getMessage());
        }
      }
    }
  };

  @Override
  public void run(ApplicationArguments args) throws Exception {
    // 为了模拟,首次延时时间0,周期为5秒钟一次
    executor.scheduleAtFixedRate(() -> {
      long startTime = System.currentTimeMillis();
        log.info("开始执行自动化任务");
        /** 省略业务代码 **/
        log.info("结束执行自动化任务,耗时:{}毫秒;", System.currentTimeMillis() - startTime);
    }, 0, 5000, TimeUnit.MILLISECONDS);
  }

  /**
   * 自定义实现拒绝策略,记录日志,队列满了之后,新任务被提交会直接被丢弃掉
   *
   * @author Zhu Lin
   * @date 2021/3/30
   */
  @Slf4j
  static class RecordExceptionExecutionHandler implements RejectedExecutionHandler {

    @Override
    public void rejectedExecution(Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {
      log.error("任务:{}, 被{}拒绝 ", runnable.toString(), threadPoolExecutor.toString());
    }
  }

}

然后起初我以为是异常的问题导致只执行一次就不执行了,控制台也不打印异常信息,因为JavaDoc中也是这么说的

    /**
     * Creates and executes a periodic action that becomes enabled first
     * after the given initial delay, and subsequently with the given
     * period; that is executions will commence after
     * {@code initialDelay} then {@code initialDelay+period}, then
     * {@code initialDelay + 2 * period}, and so on.
     * If any execution of the task
     * encounters an exception, subsequent executions are suppressed.
     * Otherwise, the task will only terminate via cancellation or
     * termination of the executor.  If any execution of this task
     * takes longer than its period, then subsequent executions
     * may start late, but will not concurrently execute.
     *
     * @param command the task to execute
     * @param initialDelay the time to delay first execution
     * @param period the period between successive executions
     * @param unit the time unit of the initialDelay and period parameters
     * @return a ScheduledFuture representing pending completion of
     *         the task, and whose {@code get()} method will throw an
     *         exception upon cancellation
     * @throws RejectedExecutionException if the task cannot be
     *         scheduled for execution
     * @throws NullPointerException if command is null
     * @throws IllegalArgumentException if period less than or equal to zero
     */
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit);

简单来说就是,任务只有遇到异常时才会停止,否贼只有取消和终止执行程序才能终止任务
所以我就改成这样

  @Override
  public void run(ApplicationArguments args) throws Exception {
    // 首次延时时间0,周期为5秒钟一次
    executor.scheduleAtFixedRate(() -> {
      long startTime = System.currentTimeMillis();
      try {
        log.info("开始执行自动化任务");
      } catch (Exception e) {
        log.error("自动开始/结束分享计划的定时任务出现异常,时间:{},异常信息:{}", LocalDateTime.now(), e.getMessage());
      } finally {
        log.info("结束执行自动化任务,耗时:{}毫秒;", System.currentTimeMillis() - startTime);
      }
    }, 0, 5000, TimeUnit.MILLISECONDS);
  }

事实证明压根不是这个问题,毕竟我里面啥都没干,就打印日志,抛啥子异常咯~,然后我试了好多遍,终于在我把afterExecutor给注释掉后,程序居然正常了!最后我把问题定位到 futureTask.get() 这行代码上,通过debug发现,线程执行到这行代码之后就不会往下走了,那么原因到底是什么?我们深入来看一下

问题解析

首先我们先看看为什么get()会被阻塞住

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    // 如果任务已经在执行中了,那么就进入等待
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
    return report(s);
}

// 等待任务执行完成
private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
    // 计算等待终止时间,如果一直等待的话,终止时间为 0
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    // 不排队
    boolean queued = false;
    // 无限循环
    for (;;) {
        // 如果线程已经被打断了,删除,抛异常
        if (Thread.interrupted()) {
            removeWaiter(q);
            throw new InterruptedException();
        }
        // 当前任务状态
        int s = state;
        // 当前任务已经执行完了,返回
        if (s > COMPLETING) {
            // 当前任务的线程置空
            if (q != null)
                q.thread = null;
            return s;
        }
        // 如果正在执行,当前线程让出 cpu,重新竞争,防止 cpu 飙高
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield();
            // 如果第一次运行,新建 waitNode,当前线程就是 waitNode 的属性
        else if (q == null)
            q = new WaitNode();
            // 默认第一次都会执行这里,执行成功之后,queued 就为 true,就不会再执行了
            // 把当前 waitNode 当做 waiters 链表的第一个
        else if (!queued)
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q);
            // 如果设置了超时时间,并过了超时时间的话,从 waiters 链表中删除当前 wait
        else if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                removeWaiter(q);
                return state;
            }
            // 没有过超时时间,线程进入 TIMED_WAITING 状态
            LockSupport.parkNanos(this, nanos);
        }
        // 没有设置超时时间,进入 WAITING 状态
        else
            LockSupport.park(this);
    }
}

我们可以看到上面那行注释为“当前任务已经执行完了,返回”的代码,只要不满足这个条件,你就会被一直阻塞,那么问题肯定出在我提交的定时任务state从来就没有被改变,这又是为什么?我们继续深究
接下来我们看到ScheduledThreadPoolExecutor#scheduleAtFixedRate方法

    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (period <= 0)
            throw new IllegalArgumentException();
        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          unit.toNanos(period));
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        sft.outerTask = t;
        delayedExecute(t);
        return t;
    }

这里的核心逻辑就是将 Runnable 包装成了一个ScheduledFutureTask对象,这个包装是在FutureTask基础上增加了定时调度需要的一些数据。(FutureTask是线程池的核心类之一)decorateTask是一个钩子方法,用来给扩展用的,在这里的默认实现就是返回ScheduledFutureTask本身。
然后主逻辑就是通过delayedExecute放入队列中。
image.png
那么为什么我们的任务state状态没有改变,肯定就是ScheduledFutureTaskrun方法啦。

/**
 * Overrides FutureTask version so as to reset/requeue if periodic.
 */

public void run() {
    // 是否是周期性任务
    boolean periodic = isPeriodic();
    // 如果不可以在当前状态下运行,就取消任务(将这个任务的状态设置为CANCELLED)。
    if (!canRunInCurrentRunState(periodic))
        cancel(false);
    else if (!periodic)
        // 如果不是周期性的任务,调用 FutureTask # run 方法
        ScheduledFutureTask.super.run();
    else if (ScheduledFutureTask.super.runAndReset()) {
        // 如果是周期性的,设置下次执行时间
        setNextRunTime();
        // 再次将任务添加到队列中
        reExecutePeriodic(outerTask);
    }
}

我们重点看ScheduledFutureTask.super.runAndReset()方法,实际上调用的是其父类FutureTaskrunAndReset()方法,这个方法会在执行成功之后重置线程状态,reset就是这个语义。同时我们可以看到,当方法执行返回false的时候,就不会再次将任务添加的队列中,这和我们最开始假设的异常情况是一致的
最后答案就在这个runAndReset和run方法的区别里:

    public void run() {
        /** 省略其他代码 **/
        try {
            // 执行任务
            result = c.call();
            ran = true;
        } catch (Throwable ex) {
            result = null;
            ran = false;
            setException(ex);
        }
        if (ran)
            set(result);
        /** 省略其他代码 **/
    }

    protected boolean runAndReset() {
        /** 省略其他代码 **/
        try {
            // 执行任务
            c.call(); // don't set result
            ran = true;
        } catch (Throwable ex) {
            setException(ex);
        }
        /** 省略其他代码 **/
    }

上面的代码我省略掉了大部分,只留出了这次问题所在的地方,感兴趣的小伙伴可以自己去ide里看看,c.call()是执行任务的地方,这里有一个默认为false的ran变量,当任务执行成功时,ran会被设成 true,即任务已执行。但这不是关键,关键是我们发现run方法里在成功后回去调一个set方法

    protected void set(V v) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            finishCompletion();
        }
    }

在set方法中修改了state的状态,这也证明了我们之前的逻辑,周期任务调runAndReset压根不去修改state,所以get方法只能阻塞,没有其他选择。

小疑惑

其实如果按阿里巴巴开发手册的规范来说的话,ScheduledThreadPoolExecutor也存在允许创建的线程数据为Integer.MAX_VALUE的问题,那么该怎么解决呢,这点我有点疑惑。

    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

ScheduledThreadPoolExecutor周期定时任务异常处理踩坑的问题!! 的相关文章

  • 在 Kotlin 中实现返回 Collection 的 Java 方法

    我将 Kotlin 与 Spring Security 结合使用 实现该方法时 public interface UserDetails extends Serializable Collection
  • .java 和 .scala 类之间是否可能存在循环依赖?

    假设我在 java 文件中定义了类 A 在 scala 文件中定义了类 B A 类使用 B 类 B 类使用 A 类 如果我使用 java 编译器 则会出现编译错误 因为 B 类尚未编译 如果我使用scala编译器A类将找不到 有没有可以同时
  • JUnit 使用 Mockito 测试异步方法

    我已经使用 Spring Framework 版本 5 0 5 RELEASE 在 Java 1 8 类中实现了异步方法 public class ClassToBeTested Autowired private MyComponent
  • 如何在Spring的applicationContext.xml中指定默认范围来请求范围?

    我想让所有 bean 请求默认作用域 但是 Spring 文档说默认作用域是 Singleton 第 3 4 1 和 3 4 2 节http static springsource org spring docs 2 5 x referen
  • 方法不必要地被调用?

    我有一个 BaseActivity 它可以通过其他所有活动进行扩展 问题是 每当用户离开 暂停 活动时 我都会将音乐静音 我也不再接听电话 问题是 onPause每当用户在活动之间切换时就会被调用 这意味着应用程序不必要地静音和停止tele
  • firestore快照监听器生命周期和定价之间有什么关系?

    在我的活动中 我有一个字符串列表 这些字符串表示我想要附加快照侦听器的 Firestore 文档 我使用 Acivity ModelView 存储库结构 在活动的 onCreate 中 我向 ViewModelProvider 询问适当的
  • 如何在具有动态列的表中插入值 Jdbc/Mysql

    我想在具有动态列的表中添加值 我设法创建一个包含动态列的表 但我不知道如何插入数据 Create Table sql CREATE TABLE MyDB myTable level INTEGER 255 int columnNumber
  • 我们可以在三元运算符(Java)中使用命令吗?

    这是一个工作代码 String a first String b second String object System out println object null a b 但它不是 String a first String b se
  • 独占锁定ConcurrentHashMap

    我知道不可能锁定 ConcurrentHashMap 进行独占访问 但是 我找不到原因 是因为构成CHM的 Segment 没有被api公开吗 据推测 如果是的话 客户端代码可以执行 交接 锁定 Cheers 我知道不可能锁定 Concur
  • 会话 bean 中的 EntityManager 异常处理

    我有一个托管无状态会话 bean 其中注入了 EntityManager em 我想做的是拥有一个具有唯一列的数据库表 然后我运行一些尝试插入实体的算法 但是 如果实体存在 它将更新它或跳过它 我想要这样的东西 try em persist
  • LibGdx 如何使用 OrthographicCamera 滚动?

    我已经找了 10 个小时 字面意思 我已经完成了 我需要问一下 事情是我正在学习如何使用 LibGdx 来编写 Java 游戏 我正在做一个水平太空飞船游戏 所以 我最糟糕的问题是我不知道如何滚动 我认为绘制会更好地解释 我想绘制一个巨大的
  • 如何自定义JProgressBar?

    我正在制作一个启动器 我想要一个自定义的进度栏 我已经做了一些研究 并且可以使用 JavaFX 从未用它做过任何事情 并且可以通过替换 UI 来实现 我正在寻找一个具有圆形边缘和圆形填充的酒吧 像这样的事情 package gui impo
  • 如何在将数据发送到 Firebase 数据库之前对其进行加密?

    我正在使用 Firebase 实时数据库制作聊天应用程序 我知道 Firebase 非常安全 只要您的规则正确 但我自己可以阅读使用我的应用程序的人的所有聊天记录 我想阻止这种情况 为此我需要一种解密和加密方法 我尝试使用凯撒解密 但失败了
  • 使用外部硬盘写入和存储 mysql 数据库

    我已经设置了 mysql 数据库在我的 Mac 上使用 java 和 eclipse 运行 它运行得很好 但现在我将生成大约 43 亿行数据 这将占用大约 64GB 的数据 我存储了大量的密钥和加密值 我有一个 1TB 外部我想用作存储位置
  • @TestPropertySource 不适用于 Spring 1.2.6 中使用 AnnotationConfigContextLoader 的 JUnit 测试

    似乎我在 Spring 4 1 17 中使用 Spring Boot 1 2 6 RELEASE 所做的任何事情都不起作用 我只想访问应用程序属性并在必要时通过测试覆盖它们 无需使用 hack 手动注入 PropertySource 这不行
  • 使用 Maven 3 时 Cobertura 代码覆盖率为 0%

    读完这篇文章后 将 Cobertura 与 Maven 3 0 2 一起使用的正确方法是什么 https stackoverflow com questions 6931360 what is the proper way to use c
  • 使用 Android 的 Mobile Vision API 扫描二维码

    我跟着这个tutorial http code tutsplus com tutorials reading qr codes using the mobile vision api cms 24680关于如何构建可以扫描二维码的 Andr
  • Java 9 中紧凑字符串和压缩字符串的区别

    有什么优点紧凑的字符串 http openjdk java net jeps 254JDK9 中的压缩字符串 压缩字符串 Java 6 和紧凑字符串 Java 9 都有相同的动机 字符串通常实际上是 Latin 1 因此浪费了一半的空间 和
  • 如何在Java中跨类共享变量,我尝试了静态不起作用

    类 Testclass1 有一个变量 有一些执行会改变变量的值 现在在同一个包中有类 Testclass2 我将如何访问 Testclass2 中变量的更新值 由 Testclass1 更新 试过这个没用 注意 Testclass1和Tes
  • 如何使用 Spring AOP 建议静态方法?

    在执行类的静态方法之前和之后需要完成一些日志记录 我尝试使用 Spring AOP 来实现这一点 但它不起作用 而对于正常方法来说它起作用 请帮助我理解如何实现这一点 如果可以使用注释来完成 那就太好了 也许您应该在使用 Spring AO

随机推荐

  • 深度学习模型参数量/计算量(附计算代码)

    参考 https mp weixin qq com s biz MzI4MDYzNzg4Mw mid 2247546551 idx 2 sn f198b6365e11f0a18832ff1203302632 chksm ebb70e63dc
  • unity 性能查看工具Profiler

    文章目录 前言 profiler工具介绍 菜单栏 帧视图 模块视图 模块详细信息 通过profiler分析优化游戏性能 最后 前言 每次进行游戏优化的时候都用这个工具查看内存泄漏啊 代码优化啊之类的东西 真的好用 但是之前也就是自己摸索一下
  • 【FPGA多周期时序约束详解】- 解读FPGA多周期时序约束的全过程

    FPGA多周期时序约束详解 解读FPGA多周期时序约束的全过程 FPGA作为数字电路设计的常见工具 其设计中必然会遇到时序约束的问题 而多周期时序约束更是FPGA设计中不可避免的难点之一 本文将详细介绍FPGA多周期时序约束的全过程 并结合
  • PHP 使用 Kafka 安装拾遗

    最近项目开发中需要使用 Kafka 消息队列 经过检索 PHP下面有通用的两种方式来调用 Kafka php rdkafka 扩展 以 PHP 扩展的形式进行使用是非常高效的 另外 该项目也提供了非常完备的 文档 不过在 Mac 环境中安装
  • Android9 默认开启/关闭GPS

    gps默认打开 需要关闭的话 修改以下文件 frameworks base packages SettingsProvider res values defaults xml 将
  • xp系统蓝屏,xp系统蓝屏的详细解决过程

    xp系统蓝屏的详细解决过程 现在XP系统微软停止开发了 服务也升级不得了 刚刚使用时还是好好的 能够正常的运行 怎么一言不合就蓝屏了呢 那么xp蓝屏怎么办呢 跟你们分享一下小编解决xp蓝屏的经验吧 重新启动 快速按F8 用箭头上下选择 最后
  • 2022VLMo: Unified Vision-Language Pre-Training with Mixture-of-Modality-Experts

    摘要 我们提出了一个统一的视觉 语言预训练模型 VLMo 该模型与一个模块化的transformer网络共同学习一个双编码器和一个融合编码器 具体地 我们引入了模态混合专家 MoME Transformer 其中每个块包含一个特定于模态的专
  • tensorRT部署之 代码实现 onnx转engine/trt模型

    tensorRT部署之 代码实现 onnx转engine trt模型 前提已经装好显卡驱动 cuda cudnn 以及tensorRT 下面将给出Python C 两种转换方式 1 C 实现 项目属性配置好CUDA tensoeRT库 通常
  • 吴恩达机器学习笔记系列(五)——梯度下降

    一 gradient descent 梯度下降 1 概念 线性回归的目的就是找出使得误差 损失函数 最小的参数值 可以用梯度下降来确定 参数的大小 梯度下降是一种迭代方法 能够求解局部最小值 结果与初始点的选取有关 为了找到最小值 从某一点
  • ADFS 4.0 证书更新

    ADFS 4 0 证书更新 由于公网证书的过期 需要重新更新ADFS的服务通信证书 证书要求 带私钥 PFX格式 更换流程 证书安装到 证书 计算机 个人 安装后点开证书能看到 你有一个与证书对应的私钥 右击证书 gt 所有任务 gt 管理
  • 【uboot内核适配学习】uboot 修改默认ip

    1 修改默认ip作用 设备出场的时候都需要默认的ip 2 修改措施 找到uboot芯片配置文件 不同芯片厂家适配的文件必定是不一样的 位置也可能不一样 define CONFIG ETHADDR 00 40 5c 26 0a 5b MAC地
  • 面经——小米面经(2021春招)

    摘自 小米面经 2021春招 感谢小米 感谢雷总 感谢上官可编程 作者 阿波罗啦啦啦啦 发布时间 2021 05 01 11 08 41 网址 https blog csdn net weixin 44933419 article deta
  • 博客营销

    1 博客营销有什么价值 应注意什么 1 博客可以直接带来潜在用户 2 博客营销的价值体现在降低网站推广费用方面 3 博客文章内容为用户通过搜索引擎获取信息提供了机会 4 博客文章可以方便地增加企业网站的链接数量 5 可以实现更低的成本对读者
  • 指标体系建设

    1 背景 结合业务场景将多个不同指标和维度进行组合 从而针对某一真实业务场景进行数据分析和决策导向 并能在整体业务变化中发现和定位问题 2 概念理解与示例分析 2 1 指标体系 指标体系 名称 分类 解析 作用 示例 指标 结果型指标 时机
  • 汉诺塔问题 java

    汉诺塔问题 public class HanoiTower 编写一个main方法 public static void main String args Tower t1 new Tower t1 move 5 A B C 汉诺塔问题的解决
  • React事件处理及事件流

    React事件处理 React事件处理是通过将事件处理器绑定到组建上处理事件 事件触发的同时更新组建的内部状态 内部状态更新会触发组件的重绘 React 元素的事件处理和 DOM 元素的事件处理很相似 但语法上的略有区别 在React中事件
  • 如何删除gitee仓库下的文件

    有时我们可能在上传项目到github或者gitee时 忘记忽略了某个文件 就直接push上去了 最后发现上传多了 如何删除掉远程仓库中的文件呢 注 在github上我们只能删除仓库 无法删除文件夹或文件 所以只能通过命令 2 打开GitBa
  • Python 程序设计习题(4) —— 列表与元组

    目录 1 Python 习题部分 2 Python 习题讲解 列表 元组 其他 1 Python 习题部分 要想学习一门语言 便少不了练习 故附上部分 Python 习题 供大家学习参考 如有错误之处 还望指正 1 二年级一班举行了数学考试
  • springboot项目中application.properties无法变成小树叶问题解决

    1 检查我们的resources目录的状态 看看是不是处在普通文件夹的状态 如果是的话 我们需要重新mark一下 右键点击文件夹 选择mark directory as resources root 此时我们发现配置文件变成了小树叶 2 如
  • ScheduledThreadPoolExecutor周期定时任务异常处理踩坑的问题!!

    问题原因 在公司写项目的时候 有一个周期定时任务的需求 就想着阿里巴巴开发手册里不是说不能用Executors去创建线程池 因为存在如下问题 FixedThreadPool和SingleThreadPool 允许的请求队列长度为 Integ