仿kafka实现java版时间轮

2023-11-18

系统定时、超时

在我们平时的项目开发中,会设置系统的超时时间,比如在http接口中设置超时时间,在定时调度中也会用到。在jdk的开发的实现Timer和ScheduledThreadPoolExecutor、DelayQueue定时调度中使用的是最小堆,我们知道最小堆的插入时间复杂度是log(n)。在kafka中,采用的是基于O(1)的时间轮算法,本节我们就使用java来模仿kafka层级时间轮。

时间轮简介

时间轮的实现思想是借鉴我们的钟表,秒针转动一圈,分钟移动一个,分钟转动一格,始终移动一格,在kafka中称为桶bucket。下面文章中称为槽。

在kafka中第一个槽默认一格表示1ms,第一个时间轮是20个槽,所以第一一个时间轮代表20ms。第二个时间轮的每一格式第一个时间轮的总时间,也就是20ms,所以第二个时间轮可表示的时间范围是400ms,依次类推,第三个时间轮可表示的时间范围是8s,第四个时间轮是160s等等。由于时间在向前推进,故一段时间后,第二个时间轮上的任务会向转移到第一个时间轮上,这样递进的方式,最终任务都会执行。
kafka中的每个槽表示一个TimerTaskList,每个任务加到这个TimerTaskList上,如下图中时间轮中每个槽都代表一个TimerTaskList。

1.任务TimerTask源码分析

TimerTask类表示一个要执行的任务,实现了Runnable接口

public abstract class TimerTask implements Runnable {

    public long delayMs; //表示当前任务延迟多久后执行(单位ms),比如说延迟3s,则此值为3000

    public TimerTask(long delayMs) {
        this.delayMs =  delayMs;
    }
    // 指向TimerTaskEntry对象,一个TimerTaskEntry包含一个TimerTask,TimerTaskEntry是可复用的
    private TimerTaskList.TimerTaskEntry timerTaskEntry = null;
  
    // 取消当前任务,就是从TimerTaskEntry移出TimerTask,并且把当前的timerTaskEntry置空
    public synchronized void cancel() {
        if(timerTaskEntry != null) {
            timerTaskEntry.remove();
        }
        timerTaskEntry = null;
    }

    //设置当前任务绑定的TimerTaskEntry
    public synchronized void setTimerTaskEntry(TimerTaskList.TimerTaskEntry entry) {
        if(timerTaskEntry != null && timerTaskEntry != entry) {
            timerTaskEntry.remove();
        }
        timerTaskEntry = entry;
    }

    public TimerTaskList.TimerTaskEntry getTimerTaskEntry() {
        return timerTaskEntry;
    }
}

2.任务包装类TimerTaskEntry

TimerTaskEntry是TimerTask的包装,实现了Compareable接口,用来比较连个任务的过期时间,以决定任务list插入的顺序

public static class TimerTaskEntry implements Comparable<TimerTaskEntry>{
        //包含一个任务
        public TimerTask timerTask;
        // 任务的过期时间,此处的过期时间设置的过期间隔+系统当前时间(毫秒)
        public Long expirationMs;
        
        // 当前任务属于哪一个列表
        private TimerTaskList list;
        // 当前任务的上一个任务,用双向列表连接
        private TimerTaskEntry prev;
        private TimerTaskEntry next;


        public TimerTaskEntry(TimerTask timerTask,Long expirationMs) {
            this.timerTask = timerTask;
            this.expirationMs = expirationMs;
            // 传递进来任务TimerTask,并设置TimerTask的包装类
            if(timerTask != null) {
                timerTask.setTimerTaskEntry(this);
            }
        }
      
        // 任务的取消,就是判断任务TimerTask的Entry是否是当前任务
        public boolean cancel() {
            return timerTask.getTimerTaskEntry() != this;
        }
      
       // 任务的移出
        public void remove() {
            TimerTaskList currentList = list;
            while(currentList != null) {
                currentList.remove(this);
                currentList = list;
            }
        }
        // 比较两个任务在列表中的位置,及那个先执行
        @Override
        public int compareTo(TimerTaskEntry that) {
            return Long.compare(expirationMs,that.expirationMs);
        }
    }

3.每个槽中的任务列表

在时间轮中每个槽代表一个列表,即TimerTaskList,每个TimerTaskList中包含多个TimerTaskEntry,并且用双向列表链接。TimerTaskList实现了Delayed接口,用于返回剩余的时间,把上层时间轮的任务移动位置。

public class TimerTaskList implements Delayed {
    //当前列表中包含的任务数
    private AtomicInteger taskCounter;
    // 列表的头结点
    private TimerTaskEntry root;
    // 过期时间
    private AtomicLong expiration = new AtomicLong(-1L);


    public TimerTaskList(AtomicInteger taskCounter) {
        this.taskCounter = taskCounter;
        this.root =  new TimerTaskEntry(null,-1L);
        root.next = root;
        root.prev = root;
    }

    // 给当前槽设置过期时间
    public boolean setExpiration(Long expirationMs) {
        return expiration.getAndSet(expirationMs) != expirationMs;
    }

    public Long getExpiration() {
        return expiration.get();
    }

    // 用于遍历当前列表中的任务
    public synchronized  void foreach(Consumer<TimerTask> f) {
        TimerTaskEntry entry = root.next;
        while(entry != root) {
            TimerTaskEntry nextEntry = entry.next;
            if(!entry.cancel()) {
                f.accept(entry.timerTask);
            }
            entry = nextEntry;
        }
    }
  
   // 添加任务到列表中
    public void add(TimerTaskEntry timerTaskEntry) {
        boolean done = false;
        while(!done) {
            timerTaskEntry.remove();

            synchronized (this) {
                synchronized (timerTaskEntry) {
                    if(timerTaskEntry.list == null) {
                        TimerTaskEntry tail = root.prev;
                        timerTaskEntry.next = root;
                        timerTaskEntry.prev = tail;
                        timerTaskEntry.list = this;
                        tail.next = timerTaskEntry;
                        root.prev = timerTaskEntry;
                        taskCounter.incrementAndGet();
                        done = true;
                    }
                }
            }
        }
    }

    //移出任务
    private synchronized void remove(TimerTaskEntry timerTaskEntry) {
        synchronized (timerTaskEntry) {
            if(timerTaskEntry.list == this) {
                timerTaskEntry.next.prev = timerTaskEntry.prev;
                timerTaskEntry.prev.next = timerTaskEntry.next;
                timerTaskEntry.next = null;
                timerTaskEntry.prev = null;
                timerTaskEntry.list = null;
                taskCounter.decrementAndGet();
            }
        }
    }

    public synchronized void flush(Consumer<TimerTaskEntry> f) {
        TimerTaskEntry head = root.next;
        while(head != root) {
            remove(head);
            f.accept(head);
            head = root.next;
        }
         expiration.set(-1L);
    }
    //获得当前任务剩余时间
    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(Math.max(getExpiration() - System.currentTimeMillis(),0),TimeUnit.MICROSECONDS);
    }
    
    @Override
    public int compareTo(Delayed d) {
        TimerTaskList other = (TimerTaskList) d;
        return Long.compare(getExpiration(),other.getExpiration());
    }
}

4.时间轮结构

时间轮TimeWheel代表一层时间轮,即第一层时间轮表示20ms,主要功能是添加任务和,驱动时间轮向前。

public class TimingWheel {

    private Long tickMs;  //每一个槽表示的时间范围
    private Integer wheelSize; // 时间轮大小,即每一层时间轮的大小
    private Long startMs; // 系统的启动时间
    private AtomicInteger taskCounter;  // 当前层任务数
    private DelayQueue<TimerTaskList> queue; //延迟队列,用于从队列取每个任务列表

    private Long interval; //每一层时间轮代表的时间
    private List<TimerTaskList> buckets;  // 每一层的每一个槽中的时间任务列表
    private Long currentTime;  // 修正后的系统启动时间
  
    private TimingWheel overflowWheel = null;  // 上一层时间轮

    public TimingWheel(Long tickMs, Integer wheelSize, Long startMs, AtomicInteger taskCounter, DelayQueue<TimerTaskList> queue) {
        this.tickMs = tickMs;
        this.wheelSize = wheelSize;
        this.startMs = startMs;
        this.taskCounter = taskCounter;
        this.queue = queue;
        interval = tickMs * wheelSize;
        currentTime = startMs - (startMs % tickMs); //当前时间,往前推

        buckets = new ArrayList<>(wheelSize);
        for(int i = 0;i < wheelSize;i++) {
            buckets.add(new TimerTaskList(taskCounter));  //创建每一个槽中的列表
        }
    }

    // 创建上层时间轮
    public synchronized void addOverflowWheel() {
        if(overflowWheel == null) {
            overflowWheel = new TimingWheel(
                    interval,  // 此处interval即表示上一层时间轮表示的范围
                    wheelSize,
                    currentTime,
                    taskCounter,
                    queue
            );
        }
    }
  
  // 添加任务
    public boolean add(TimerTaskList.TimerTaskEntry timerTaskEntry) {
        Long expiration = timerTaskEntry.expirationMs;
       
        Long thisTime = currentTime + tickMs;
        // 任务是否已经取消,取消则返回
        if(timerTaskEntry.cancel()) {
            return false;
        // 当前任务是否已经过期,如果过期则返回false,要立即执行
        }else if(expiration < currentTime + tickMs) {
            return false;
        // 判断当前任务能否在添加到当前时间轮
        }else if(expiration < currentTime + interval) {
           
            Long virtualId = expiration / tickMs;  
            // 计算当前任务要分配在哪个槽中
            int whereBucket = (int) (virtualId % wheelSize);
            TimerTaskList bucket = buckets.get((int)(virtualId % wheelSize));

            bucket.add(timerTaskEntry);

            long bucketExpiration = virtualId * tickMs;
            //更新槽的过期时间,添加入延迟队列
            if(bucket.setExpiration(virtualId * tickMs)) {
                queue.offer(bucket);
            }
            return true;
        }else {
          //添加任务到高层时间轮
            if(overflowWheel == null) addOverflowWheel();
            return overflowWheel.add(timerTaskEntry);
        }
    }

    // 向前驱动时间
    public void advanceClock(Long timeMs) {
        if(timeMs >= currentTime + tickMs) {
            currentTime = timeMs - (timeMs % tickMs);

            if(overflowWheel != null) {
                overflowWheel.advanceClock(currentTime);
            }
        }
    }
}

5. 时间轮接口

  • kafka中提供了Timer接口,用于对外提供调用,分别是Timer#add添加任务;Timer#advanceClock驱动时间; Timer#size时间轮中总任务数;Timer#shutdown停止时间轮
public interface Timer {
    void add(TimerTask timerTask);
    boolean advanceClock(Long timeoutMs) throws Exception;
    int size();
    void shutdown();
}
  • Timer的实现类是SystemTimer
public class SystemTimer implements Timer {

    private String executorName;
    private Long tickMs = 1L;
    private Integer wheelSize = 20;
    private Long startMs = System.currentTimeMillis();
    //用来执行TimerTask任务
    private ExecutorService taskExecutor =
            Executors.newFixedThreadPool(1,(runnable) -> {
                Thread thread = new Thread(runnable);
                thread.setName("executor-" + executorName);
                thread.setDaemon(false);
                return thread;
            });
    //延迟队列
    private DelayQueue<TimerTaskList> delayQueue = new DelayQueue<>();
    private AtomicInteger taskCounter = new AtomicInteger(0);
    private TimingWheel timingWheel;

    private ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    private ReentrantReadWriteLock.ReadLock readLock = readWriteLock.readLock();
    private ReentrantReadWriteLock.WriteLock writeLock = readWriteLock.writeLock();

    // 用来执行时间轮的重新排列,及上一个槽中的任务列表被执行后,后面的槽中的任务列表移动
    private Consumer<TimerTaskEntry> reinsert = (timerTaskEntry) -> addTimerTaskEntry(timerTaskEntry);

    public SystemTimer(String executorName, Long tickMs, Integer wheelSize, Long startMs) {
        this.executorName = executorName;
        this.tickMs = tickMs;
        this.wheelSize = wheelSize;
        this.startMs = startMs;
        this.timingWheel = new TimingWheel(
                tickMs,
                wheelSize,
                startMs,
                taskCounter,
                delayQueue
        );
    }

    // 可能会多个线程操作,所以需要加锁
    @Override
    public void add(TimerTask timerTask) {
        readLock.lock();
        try{
            addTimerTaskEntry(new TimerTaskEntry(timerTask,timerTask.delayMs + System.currentTimeMillis()));
        }finally {
            readLock.unlock();
        }
    }

    private void addTimerTaskEntry(TimerTaskEntry timerTaskEntry) {      // 往时间轮添加任务
        if(!timingWheel.add(timerTaskEntry)) {
            // 返回false并且任务未取消,则提交当前任务立即执行。
            if(!timerTaskEntry.cancel()) {
                taskExecutor.submit(timerTaskEntry.timerTask);
            }
        }
    }


    // 向前驱动时间轮
    @Override
    public boolean advanceClock(Long timeoutMs) throws Exception{
        // 使用阻塞队列获取任务
        TimerTaskList bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS);
        if(bucket != null) {
            writeLock.lock();
            try{
                while(bucket != null) {
                    timingWheel.advanceClock(bucket.getExpiration());
                    // 驱动时间后,需要移动TimerTaskList到上一个槽或者从上一层移动到本层
                    bucket.flush(reinsert);
                    bucket = delayQueue.poll();
                }
            }finally {
                writeLock.unlock();
            }
            return true;
        }else {
            return false;
        }
    }

    @Override
    public int size() {
        return taskCounter.get();
    }

    @Override
    public void shutdown() {
        taskExecutor.shutdown();
    }
}

6. 时间轮接口

public class SystemTimerTest {
    //驱动时间轮向前的线程
    private static ExecutorService executorService = Executors.newFixedThreadPool(1);
    public static  SystemTimer timer = new SystemTimer("test",1000L,5,System.currentTimeMillis());


    public static void runTask() throws Exception {
        for(int i = 0;i < 10000;i+= 1000) {
            // 添加任务,每个任务间隔1s
            timer.add(new TimerTask(i) {
                @Override
                public void run() {
                    System.out.println("运行testTask的时间: " + System.currentTimeMillis());
                }
            });
        }
    }

    public static void main(String[] args) throws Exception {
        runTask();

        executorService.submit(() -> {
            while(true) {
                try {
                    // 驱动时间轮线程间隔0.2s驱动
                    timer.advanceClock(200L);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });


        Thread.sleep(1000000);
        timer.shutdown();
        executorService.shutdown();
    }
}


 

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

仿kafka实现java版时间轮 的相关文章

随机推荐

  • C#中Ilist与list的区别小结

    常见问题 Ilist lt gt 本身只是一个泛型接口 既然是接口当然不能实例化 只能用如下方法 IList
  • python的print打印颜色设置

    1 顺序 显示方式 前景颜色 背景颜色 2 顺序非固定 但尽量按默认书写方式 3 也可以在input中输出使用 4 格式 print 033 显示方式 前景颜色 背景颜色m 033 0m 显示方式 意义 显示方式 默认 0 高亮显示 1 下
  • 读书笔记:Bi-Directional Cascade Network for Perceptual Edge Detection

    目录 摘要 1 介绍 2 相关工作 3 方法 3 1公式 3 2BDCN的架构 3 3网络训练 4 实验 4 1数据集 摘要 利用多尺度表示对于提高不同尺度对象的边缘检测至关重要 为了利用多尺度提取边缘 我们提出了一种全向级联网络 BDCN
  • 仿网页列表的实现

    序言 实现思路 代码实现 效果展现 总结 序言 最近遇到了一项需求 用Android实现网页中的列表效果 首先我便想到了用ListView的方式来实现 由于网页的表格数据通常都是以一行的形式展现的 因此 我把需要展现的数据放在了一行 可是需
  • python中的tcp/ip连接

    参考博客 https blog csdn net c123 sensing article details 81563702 服务端的代码 usr bin python coding UTF 8 import socket sock soc
  • Android很好看的登陆界面(包含详细代码)

    一 这是我自己在做一个小项目的时候做的界面 在这里分享给大家 其实没什么东西 主要是利用了Material Design设计风格 1 在这里给大家安利一下Material Design设计风格 这个组件库的里面的组件和Android原生的组
  • 【SSL证书安全】

    SSL证书介绍 一种数字证书 也被称为 https证书 CA证书 安全证书 服务器证书 或 SSL证书 一般新申请的证书中 由三部分组成 分别为 CA证书 公钥 私钥 术语定义 发送内容 私钥加密过的内容 数字签名 确认发送内容的完整性 h
  • Cobalt Strike(学习笔记)

    Cobalt Strike简介 Cobalt Strike 是一款GUI的框架式渗透工具 集成了端口转发 服务扫描 自动化溢出 多模式端口监听 win exe木马生成 win dll木马生成 java木马生成 office宏病毒生成 木马捆
  • Linux实用工具

    2019独角兽企业重金招聘Python工程师标准 gt gt gt 1 Windows下同步Linux文件 Linux安装Samba和配置 场景需求 安装了Ubuntu在虚拟机上 但是代码编辑或者其它更多的操作的时候 还是习惯在window
  • @Autowired注解的实现原理

    Autowired注解用法 在分析这个注解的实现原理之前 我们不妨先来回顾一下 Autowired注解的用法 将 Autowired注解应用于构造函数 如以下示例所示 public class MovieRecommender privat
  • 3dmax2014卸载/安装失败/如何彻底卸载清除干净3dmax2014注册表和文件的方法

    3dmax2014提示安装未完成 某些产品无法安装该怎样解决呢 一些朋友在win7或者win10系统下安装3dmax2014失败提示3dmax2014安装未完成 某些产品无法安装 也有时候想重新安装3dmax2014的时候会出现本电脑win
  • 【电子电路】逻辑章

    人不能 至少不应该 一天速成电子电路 不全 漏掉的那部分是常识或者 真的是眼睛不好了 BCD码 即8421码 2421码 余3码 平常的二进制就是8421码 其余以此类推 符号位 1100 正 1101 负 可靠性编码 奇偶校验码 奇校验
  • H5页面与vue的客户端交互

    工作中经常遇到一些奇怪的东西 我有个这样的需求 就是我写的vue项目被嵌套在别的h5项目页面下 so进入我的页面前需要判断他的h5页面有没有登录 这时候就需要我的客户端页面调用h5页面的登录方法 客户端页面 ios 安卓 两种都得交互h5页
  • 海康威视系统未连接服务器,ivms-4200客户端登入不了云服务器

    ivms 4200客户端登入不了云服务器 内容精选 换一换 本章节为您介绍以下内容 准备弹性云服务器作为GDS服务器在使用GDS导入导出数据之前 需要准备一台或多台与GaussDB DWS 集群在相同VPC内的Linux弹性云服务器 简称E
  • Redis使用总结(四、处理延时任务)

    引言 在开发中 往往会遇到一些关于延时任务的需求 例如 生成订单30分钟未支付 则自动取消 生成订单60秒后 给用户发短信 对上述的任务 我们给一个专业的名字来形容 那就是延时任务 那么这里就会产生一个问题 这个延时任务和定时任务的区别究竟
  • 简单讲解一下什么是ATT&CK框架

    点击 仙网攻城狮 关注我们哦 不当想研发的渗透人不是好运维 让我们每天进步一点点 简介 ATT CK框架由 MITRE 安全组织提出并列出了APT 高级可持续威胁攻击 的14个阶段涉及到206个安全技术点上千种攻击 检测手段 基本覆盖所有网
  • Mip-Nerf三维重建代码复现教程——环境配置

    Mip Nerf三维重建Pytorch使用Pycharm运行0基础教程 项目论文 项目Github 你好 这里是 出门吃三碗饭 本人 本文章接下来将介绍如何从0运行2020会议Mip Nerf的Pytorch版本 让你自己动手渲染第一个三维
  • 网络安全应急响应----1、应急响应简介

    目录 1 应急响应流程 2 PDCERT应急响应方法模型 3 应急响应常见事件 4 应急响应分析流程 网络安全应急响应 针对已经发生的或可能发生的安全事件进行监控 分析 协调 处理 保护资产安全 1 应急响应流程 2 PDCERT应急响应方
  • uni-App聊天功能的源码

    前言 泡泡IM uniapp版聊天源码是一套完整的基于uniapp开发的聊天软件源码 可编译成微信小程序 安卓 IOS APP聊天软件 H5网页聊天室 uniapp聊天源码未加密 无外部依赖 可私有化部署 可二次开发 文档全面 接口丰富 方
  • 仿kafka实现java版时间轮

    系统定时 超时 在我们平时的项目开发中 会设置系统的超时时间 比如在http接口中设置超时时间 在定时调度中也会用到 在jdk的开发的实现Timer和ScheduledThreadPoolExecutor DelayQueue定时调度中使用