Netty源码剖析之HashedWheelTimer时间轮

2023-10-26

版本信息:

JDK1.8

Netty-all:4.1.38.Final

时间轮的介绍

我们知道钟表分为很多块,每时钟滴答一次就往前走一个块,而时间轮就是使用这个思想。如下图

上图总共分为8块,每过100ms就往前走一块,然后周而复始。此时,我们能不能在每一块上挂载任务呢,然后每过100ms就执行块上的任务,实现类似于Scheduled延迟调度任务的功能。

下面使用一个案例+画图介绍一下时间轮。

此时,每块的间隔是100ms,时间轮的current指针已经执行了400ms,此时插入一个延迟200ms调度的任务进来。

而插入之需要算出时间轮的current指针的时间,然后加上本次调度的时间,就可以直接往哪一块添加任务,所以插入的效率是O1时间复杂度,不过在冲突的情况下需要使用链表链起来。而解决冲突的最好办法就是把块增多减少碰撞(HashMap同样的思想)

如果某个节点发生了碰撞,存在3个任务都在一个块,当current执行到哪一块的时候,就会串行化执行3个任务,如果任务中存在耗时任务,那么其他任务就会延迟执行,超过预期的执行时间,也会影响到整体的current前进,导致整体的时间对不上。所以使用时间轮的任务需要对时延的准确性低,并且尽量保证任务本身精简不携带耗时操作~

时间轮和小顶堆的区别

PriorityQueue优先级队列icon-default.png?t=N7T8https://blog.csdn.net/qq_43799161/article/details/132734047?spm=1001.2014.3001.5502

在上篇文章中讲述了PriorityQueue优先级队列,它底层由小顶堆实现(完全二叉树),在插入元素的时候需要向上调整(siftUp),在取出元素的时候需要向下调整(siftDown),调整的过程是非常浪费性能,尤其是数据量过多的时候。

而时间轮通过O1的时间复杂度直接定位在哪一块上,如果有冲突就使用链表把定位在同一块的任务链起来,不需要任何的调整,整体效率比小顶堆高,尤其是数据量大的时候差距就更加的明显~

HashedWheelTimer源码分析

直接从构造方法入手~

/**
 * @param threadFactory         线程工厂
 * @param tickDuration          间隔时间,默认是100
 * @param unit                  时间单位,默认是毫秒ms
 * @param ticksPerWheel         总块数,默认是512块
 * @param leakDetection         是否泄漏检测,默认为true
 * @param maxPendingTimeouts    最大任务数,默认为-1,-1代表无限数量。
 * */
public HashedWheelTimer(
    ThreadFactory threadFactory,
    long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
    long maxPendingTimeouts) {

    // 创建HashedWheelBucket数组。数组大小为ticksPerWheel,默认512快。(会优化成2的指数倍数,因为要hash运算)
    wheel = createWheel(ticksPerWheel);
    mask = wheel.length - 1;    // hash运算掩码

    // 把用户传入的间隔单位转换成纳秒。
    long duration = unit.toNanos(tickDuration);

    // 时间间隔小于默认的最小值(最小值为1毫秒)
    if (duration < MILLISECOND_NANOS) {
        // 赋值为默认的最小值
        this.tickDuration = MILLISECOND_NANOS;
    } else {
        // 正常赋值
        this.tickDuration = duration;
    }

    // 创建时间轮的工作线程
    workerThread = threadFactory.newThread(worker);

    // 默认为-1,也即为无限大
    // 当然这个值用户可以自行传入。
    this.maxPendingTimeouts = maxPendingTimeouts;
}

对构造方法做一个总结:

  1. 创建HashedWheelBucket数组,这个数组就是时间轮的块,默认有512块。所以也尽可能的减少碰撞
  2. 把用户传入的间隔时间,默认为100ms,转换成纳秒,因为纳秒计算保证了精准性
  3. 创建时间轮的工作线程,此工作线程的指责是每次的100ms滴答,执行每个块的任务
  4. 赋值总任务量,默认为-1,也即默认无限多。

构造方法把一切初始化好了,创建了线程,所以需要找到线程在那里开启,线程的执行代码~

既然已经初始化好了,那么就看到创建延迟调度任务的方法,此方法中启动了时间轮工作线程

/**
    * @param task  任务
    * @param delay 延迟时间
    * @param unit  时间单位
    * @author liha
    * 
*/
@Override
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
    // 限流。
    long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
    if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
        pendingTimeouts.decrementAndGet();
        throw new RejectedExecutionException("Number of pending timeouts ("
            + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
            + "timeouts (" + maxPendingTimeouts + ")");
    }

    // 开启线程。
    // 内部使用状态机+unsafe保证只会有一个线程启动
    start();

    // 当前时间 + 本次延迟调度的时间 - 时间轮开始的时间 = 本次调度的绝对时间。
    long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;

    // 创建任务,往时间轮的工作线程队列投递
    HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
    timeouts.add(timeout);      
    return timeout;
}

对创建延迟调度任务的方法做一个总结:

  1. 限流操作,是否达到了用户设置的总任务数的阈值
  2. 启动线程,内部使用状态机+unsafe保证只会有一个线程启动
  3. 算出本次调度的绝对时间。而绝对时间是从工作线程启动的时候开始算的,为什么要这么算?因为工作线程启动时钟就开始滴答,也即current指针开始移动。所以我们有必要把这些时间算进去 ,再加上本次延迟调度的时间 ,就等于最终调度的绝对时间。
  4. 创建出HashedWheelTimeout对象,此对象就是延迟调度任务
  5. 多线程之间的传输任务肯定是使用队列,所以使用队列将HashedWheelTimeout投递到工作线程中

所以,我们接下来看到时间轮工作线程。

// 线程执行点。
@Override
public void run() {
    // 当前线程的启动时间
    startTime = System.nanoTime();         

    // 唤醒阻塞在等待此线程启动的线程。
    startTimeInitialized.countDown();           

    do {
        // 使用休眠模拟滴答。
        final long deadline = waitForNextTick();
        if (deadline > 0) {
            // 算出本次滴答执行的任务在那个索引位置。
            int idx = (int) (tick & mask);

            // 处理取消的任务
            processCancelledTasks();

            // 取出当前滴答索引对应的HashedWheelBucket
            HashedWheelBucket bucket =
            wheel[idx];

            // 从队列中取出其他线程投递的HashedWheelTimeout调度任务。
            transferTimeoutsToBuckets();

            // 处理当前批次的。
            bucket.expireTimeouts(deadline);
            
            // 为下次滴答+1。
            tick++;         
        }
    } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);

    // 跳出do while循环代表已经处于stop状态,所以需要做收尾工作。
    // 把队列中还没有处理的任务返回给用户自行去处理
    for (HashedWheelBucket bucket: wheel) {
        bucket.clearTimeouts(unprocessedTimeouts);
    }
    for (;;) {
        HashedWheelTimeout timeout = timeouts.poll();
        if (timeout == null) {
            break;
        }
        if (!timeout.isCancelled()) {
            unprocessedTimeouts.add(timeout);
        }
    }
    processCancelledTasks();
}
  1. 获取到当前线程启动的时间
  2. 唤醒等待当前线程启动的线程
  3. waitForNextTick方法使用休眠模拟时钟滴答
  4. 算出本次滴答后需要执行的块的索引
  5. 处理取消的任务
  6. 通过块的索引拿到HashedWheelBucket
  7. 从队列中取出其他线程投递的HashedWheelTimeout调度任务。
  8. 处理当前HashedWheelBucket中的任务
  9. 为下次滴答+1。
  10. 当工作线程进入到stop状态后,会把没有执行的任务打包,当用户调用stop方法会拿到这些没处理的任务,交给用户自行处理。

这里我们看到waitForNextTick方法如何使用休眠模拟时钟滴答

private long waitForNextTick() {
    // tick是总滴答的次数。
    // 滴答间隔 * 总滴答的次数+1 = 本次滴答完后的总滴答时间
    long deadline = tickDuration * (tick + 1);

    for (;;) {
        // 得到工作线程从启动开始总共运行的时间(这是一个相对时间)
        final long currentTime = System.nanoTime() - startTime;

        /**
         * 这里拿本次应该滴答后达到的时间 - 工作线程从启动开始总共运行的时间 = 本次睡眠的时间
         * 注意:这里可能已经是负数了,因为执行之前的调度任务需要时间
         * + 999999 / 1000000 是为了四舍五入,并且把纳秒转换成毫秒,因为sleep方法需要毫秒
         * */
        long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;

        // 小于0直接返回,代表达到时间了。
        if (sleepTimeMs <= 0) {
            if (currentTime == Long.MIN_VALUE) {
                return -Long.MAX_VALUE;
            } else {
                return currentTime;
            }
        }

        try {
            // 睡眠
            Thread.sleep(sleepTimeMs);
        } catch (InterruptedException ignored) {
            if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
                return Long.MIN_VALUE;
            }
        }
    }
}

这里比较简单,就是算出本次滴答后的绝对时间 - 当前工作线程总共执行的时间 = 本次应该休眠的时间,然后去Thread.sleep 睡眠模拟时钟滴答。

我们继续看到transferTimeoutsToBuckets方法是如何接受队列的HashedWheelTimeout调度任务

private void transferTimeoutsToBuckets() {
    // 尝试10000次,如果10000次还没有队列来就下一轮再处理,因为我们不能在这里浪费过多的时间影响到精准度
    for (int i = 0; i < 100000; i++) {
        // 从队列中尝试取出。
        HashedWheelTimeout timeout = timeouts.poll();

        // timeout.deadline是拿到用户传入的调度时间
        // tickDuration 这个是一次滴答的时间。
        // 所以这里算出调度需要多少次滴答
        long calculated = timeout.deadline / tickDuration;
        
        // 计算出多少轮可以调度。
        timeout.remainingRounds = (calculated - tick) / wheel.length;

        final long ticks = Math.max(calculated, tick); 

        // hash运算,得到HashedWheelBucket数组的索引。
        int stopIndex = (int) (ticks & mask);

        HashedWheelBucket bucket = wheel[stopIndex];
        // 添加到HashedWheelBucket,其中使用双向链表,等待被调度。
        bucket.addTimeout(timeout);
    }
}

这里for循环尝试10w次,因为不能尝试太多次,不然会影响到调度的精准度。

每次尝试从队列中获取到调度任务,计算出当前任务需要多少个滴答,最后hash运算添加到对应的HashedWheelBucket中,等待被调度。

在本文的最后看一下,如何调用任务,看到HashedWheelBucket的expireTimeouts方法

public void expireTimeouts(long deadline) {

    HashedWheelTimeout timeout = head;

    while (timeout != null) {
        HashedWheelTimeout next = timeout.next;
        // 小于等于0 代表可以被调度了,要不然就-1
        if (timeout.remainingRounds <= 0) {  
            // 要被调度的任务就从链表中移除。   
            next = remove(timeout); 
            if (timeout.deadline <= deadline) {
                // 执行任务。
                timeout.expire();
            } else {
                        // The timeout was placed into a wrong slot. This should never happen.
                throw new IllegalStateException(String.format(
                    "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
            }
        } else if (timeout.isCancelled()) {
            // 被取消了,所以从队列中移除
            next = remove(timeout);
        } else {        
            // 整整大了N个周期,所以-1,等到remainingRounds为0的时候就是需要被调度
            timeout.remainingRounds --;
        }
        timeout = next;
    }
}

这里就非常的容易了,直接遍历双向链表,串行化的执行HashedWheelTimeout的expire方法,在expire方法中会调用用户传入的业务逻辑 

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Netty源码剖析之HashedWheelTimer时间轮 的相关文章

  • 使用java 8流的if-else条件[重复]

    这个问题在这里已经有答案了 设想 有一种情况我需要设置一些值List使用 Java 8 流 API 基于某些字段条件的对象 下面是对象的示例User public class User private int id private Stri
  • Swing 应用程序的丰富日历组件[关闭]

    Closed 这个问题不符合堆栈溢出指南 help closed questions 目前不接受答案 我的公司正在向 Swing 应用程序添加一些丰富的日历功能 我们希望它像 Outlook 日历一样工作 以下是我们的一些要求 日 周和月的
  • 已完成 Java 项目,现在创建 jar 或 .exe 文件(带有数据库)

    所以 我刚刚完成了一个小的java应用程序 带有数据库和其他东西 我使用了 Netbeans 和 Mysql 现在我想导出我的项目 这样我就可以在任何我想要的地方使用它 任何计算机 即使没有安装 Mysql 或 Java 所以 我尝试了一些
  • 无法应用插件 [id 'forge']

    这是一个延续这个问题 https stackoverflow com questions 58312064 upgrading gradle我的第一个问题已经解决了 但是新的问题又出现了 按照其中提到的教程 解决了一些错误后 现在当我尝试运
  • 如何获取今天的日期?

    换句话说 我想要提供 Joda Time 的功能 today today withTime 0 0 0 0 但没有 Joda Time 只有 java util Date setHours 等方法已被弃用 还有更正确的方法吗 Date to
  • Java:CopyOnWriteArrayList 与 SynchronizedList

    有什么区别CopyOnWritearraylist and Collections synchronizedList 什么时候应该优先选择其中一个 CopyOnWriteArrayList当读取次数远远超过写入次数时 应使用列表 这是因为您
  • Spring真的不支持接口注入吗?

    我知道 Spring 不支持接口注入 而且我已经读过很多次了 但今天 当我看到 Martin Fowler 写的一篇关于国际奥委会的文章时 link http martinfowler com articles injection html
  • 用于匹配文件中的十六进制数字的 Java 正则表达式

    所以我正在读取一个文件 例如java程序 58 68 58 68 40 c 40 48 FA 如果我幸运的话 但更常见的是 它在每行之前和之后都有几个空白字符 这些是我正在解析的十六进制地址 我基本上需要确保我可以使用扫描仪 缓冲阅读器等来
  • 如何在 Intellij IDEA 中使用本机库制作 jar?

    如何在 Intellij IDEA 中使用本机库制作 jar 在 JVM 中 它看起来像 Djava library path C Users User workspace lib native win None
  • Spark Dataframe Write to CSV 在独立集群模式下创建 _temporary 目录文件

    我在跑步spark job在有 2 个工作节点的集群中 我使用下面的代码 spark java 将计算的数据帧作为 csv 保存到工作节点 dataframe write option header false mode SaveMode
  • 如何模拟 DefaultCellEditor 的“onStartCellEditing”

    CellEditorListener 有 editingStopped 和 editingCancelled 但是 我如何实现在单元格编辑会话开始时需要运行的一段代码呢 一个典型的示例可能是 当您开始编辑时 您希望 JTextField 编
  • 如何在客户端使用 Java 读取 gRPC 中的元数据

    我正在使用 Java 和 Protoc 3 0 编译器 我的 proto 文件如下所述 https github com openconfig public blob master release models rpc openconfig
  • Java 中的 \x 转义?

    我想知道 Java 中是否有类似 C 中的十六进制 x 转义 例如 char helloworld x48 x45 x4C x4C x4F x20 x57 x47 x52 x4C x44 printf s helloworld 从目前看来
  • Vaadin:如何将 META-INF/服务添加到战争中?

    我有一个 Vaadin 7 maven Web 项目 其中有一些注释可以在其上创建服务定义META INF services 我将其添加到 pom 中 以便处理注释
  • 使用 Flexjson 将 JSON 列表反序列化为对象列表

    我正在尝试反序列化以下 json books id 1 name book 1 id 2 name book 2 进入列表 之前用这个 json 工作过 id 1 name book 1 id 2 name book 2 使用此代码 Lis
  • Spring 4 中有多个@ComponentScan?

    我正在使用 Spring 4 16 和 Java Annotations 我想做一些类似的事情 Configuration ComponentScan basePackages com example business includeFil
  • 使用 Java 将 PDF 页面导出为一系列图像

    我需要将任意 PDF 文档的页面导出为 jpeg png etc 格式的一系列单独图像 我需要在 Java 中执行此操作 虽然我确实了解 iText PDFBox 和各种其他 java pdf 库 但我希望能找到一些工作示例或一些操作方法
  • 字符串交错的动态规划问题解决方案

    我试图解决这个问题 但我放弃了 找到了下面的解决方案 尽管我不明白该解决方案是如何工作的 或者为什么它有效 任何深入的解决方案将不胜感激 问题 Given s1 s2 s3 求是否s3由交错形成s1 and s2 例如 给定 s1 aabc
  • 有人知道针对低内存使用进行优化的 java.util.Map 实现吗?

    我查看了通常的地方 apache commons google 但找不到一个 它应该是开源的 几乎正在寻找一个基于链接列表的 用例是 10 000 张地图 不一定有很多值 它不需要按比例放大 因为当它变得太大时我可以转换它 一些数字 大小使
  • Java 堆空间与 GSON

    我正在使用 GSON 创建一些大的 JSON 文件 以从 GTFS Google Transit 创建自定义 JSON 问题是当我从对象类转换为 JSON 时 Gson gson new Gson String rutasJSON gson

随机推荐

  • Vue之插件的介绍

    简介 主要介绍Vue插件的概念 定义和使用 Vue的插件主要是用于增强功能 可以把它看作是一个工具库 可以提供很多强大的功能 比如一些强大的自定义指令 一些强大的工具方法 过滤器等 我们可以编写或者直接引入别人写的插件 就能获得强大的功能
  • odoo 权限

    创建安全组并分配用户 Odoo中的访问权限通过安全组成进行配置 给组指定权限 然后为组分配用户 每个功能区都有中枢应用所提供的基础安全组 在插件继承已有应用时 它们应对相应的组添加权限 参见本章稍后的向模型添加访问权限一节 在插件模块添中添
  • HDOJ 1058 Humble Numbers解题报告【DP】

    Humble Numbers 题目详见http acm hdu edu cn showproblem php pid 1058 开始拿到这个题目的时候还纠结了半天 英语很差的话这个题是不可能AC的 而我就是其中之一 Humber Numbe
  • spring-boot-maven-plugin报错的修改与版本号查看

    我报错的原因是因为没加版本号 版本号是多少 可以下个everything搜spring boot maven plugin 前面的号码就是版本号了
  • [转]出租车轨迹处理(二):时空分析

    接下来就要进行一些简单的分析了 今天的目标是如何对某一感兴趣区域进行出租车数据的时空分析 一 轨迹数据预处理 这一步在上一篇文章中已经有了介绍 步骤无非就是 1 使用pandas读取数据 import pandas as pd import
  • Matlab实现粒子群算法(附上完整仿真代码)

    粒子群算法 Particle Swarm Optimization PSO 是一种群体智能算法 通过模拟自然界中鸟群 鱼群等生物群体的行为 来解决优化问题 在PSO算法中 每个个体被称为粒子 每个粒子的位置表示解空间中的一个解 每个粒子的速
  • AVL树的插入与删除(均为递归实现)

    一 引言 AVL树是带有平衡条件的二叉查找树 这个平衡条件必须要容易保持 而且它必须保证树的深度是O logN 一颗AVL树是其每个节点的左子树和右子树的高度最多差一的二叉查找树 主要介绍插入算法和删除算法 二 AVL树的结点定义 type
  • (二)RK3566 Android11固件烧录

    上一篇 一 RK3566 Android11 系统编译 文章目录 1 固件包烧录步骤 2 固件统一打包 3 固件升级 1 固件包烧录步骤 烧录工具位置 RKTools windows AndroidTool AndroidTool Rele
  • e17 enlightenment 介绍及配置

    为什么要有一个窗口管理器 为什么一定要有一个桌面背景 甚至是标题栏 或是如果把一个应用程序如firefox当成桌面背景行不行 桌面能不能再快一点 我不想把资源浪费在那些用不到的地方 Linux那么多虚拟桌面 为什么我不能在一个桌面全屏运行一
  • python django框架ORM模型及ORM操作数据库 笔记

    ORM模型介绍 随着项目的越来越大 采用写原生SQL的方式在代码中会出现大量的SQL语句 那么问题就出现了 1 SQL语句重复利用率不高 越复杂的SQL语句条件越多 代码越长 会出现很多相近的SQL语句 2 很多SQL语句是在业务逻辑中拼出
  • 深度卷积神经网络中的patch

    转载 https blog csdn net wills798 article details 97974617 在阅读基于深度卷积神经网络的图像识别 分类或检测的文献时经常看到 patch 不是很能理解 后来就总结了一下 通过阅读 pat
  • 【深度学习——点云】PointNet++

    这篇文章发表于NIPS 2017 是在PointNet基础上的工作 论文地址 PointNet Deep Hierarchical Feature Learning on Point Sets in a Metric Space 1 Mot
  • vue实现简单轮播图

    实现思路 将vue的框架封装在function中 在界面刷新时调用 将要轮播的图片存放在data中 还有下面的列表也分别保存在data中的一个数组中 然后每隔一段时间进行自动切换的函数写在methods中 注意函数要调用的话 就要在生命周期
  • 如何fork GitHub上的官方仓库

    在GitHub中 fork表示复制一个仓库到你自己的GitHub账号下 创建一个独立的副本 通过fork操作 你可以在自己的副本中进行修改 改进和实验 而不会影响到原始仓库或其他人的工作 当你fork一个仓库时 GitHub将会为你创建一个
  • Spring Boot(二)SpringBoot是如何启动Spring容器源码

    SpringApplication run 调用SpringApplication run启动springboot应用 1 SpringApplication run Application class args 2 使用自定义Spring
  • 使用R语言绘制散点图

    文章目录 学习目标 学习内容 内容小结 学习目标 我们所采用的学习内容来自B站的Lizongzhang老师的R语言的学习分享 今天学习的主要内容是关于 绘制散点图 学习内容 下面是学习的主要内容 直接上代码 大家可以查看视频 我也的注释也比
  • 求阶乘的三种方法

    方法一 递归算法 include
  • 【手把手带你用pid算法控制电机】——(4)串级PID控制电机

    前言 1 该系列教程是基于stm32f103c8t6最小系统板的hal库开发 用最通俗易懂的方式手把手带你学会使用Pid算法的速度环 位置环以及速度位置串级pid 2 出这一期Pid系列教程的想法是前段时间我参加了一个比赛 要用到串级Pid
  • 第1课:三位一体定位法,让写作事半功倍

    做最懂技术的传播者 最懂传播的工程师 课程内容分析 本课程的目标是 通过对一系列问题的梳理 找到适合自己的输出状态 确定与理想输出状态之间存在的差距 以及采取什么办法 减少差距 知识要点 1 受众需要什么 省时间的内容 收敛 看过就走 教你
  • Netty源码剖析之HashedWheelTimer时间轮

    版本信息 JDK1 8 Netty all 4 1 38 Final 时间轮的介绍 我们知道钟表分为很多块 每时钟滴答一次就往前走一个块 而时间轮就是使用这个思想 如下图 上图总共分为8块 每过100ms就往前走一块 然后周而复始 此时 我