Rust tokio::select学习杂记

2023-05-16

Rust tokio::select学习杂记

  1. 前言

Linux系统有select/poll/epoll等,主要用于监控各种fd上发生的各种event, 从而识别派发处理。golang语言中也有一个select,作用相似,主要监控channel上发生的可读可写event。 对于rust tokio/async_std/crossbeam/futures等也需要一个select去统一集中监控, 本笔记只针对tokio, 所以专门学习tokio crate提供的select!宏。

官方文档开篇对select!的定义:

Wait on multiple concurrent branches, returning when the first branch completes, cancelling the remaining branches.

 

本人水平和精力有限,加之考证不详,故此难免谬误,粗鄙杂记随笔,只做抛砖引玉之举,切望见谅!

 

2.要点

 

(1)The select! macro must be used inside of async (functions, closures, and blocks).

(2) 每一个async expression and handler code 都是在当前task中执行的, 一旦block or long running 当前task所在的thread, 则select!没法检查其他branch case了!故此需避免此种情况,也可以调用tokio::spawn去并行执行,然后把join handle交给select!去监控即可。

(3)else branch是必须的,可以避免当所有branch disable时, select! panic.

select!panics if all branches are disabled **and** there is no providedelsebranch. A branch is disabled when the providedifprecondition returnsfalse**or** when the pattern does not match the result of.

(4) select!聚合所有enable branch的async expression并发执行, 一旦有执行完毕返回者, 则立即进行pattern模式匹配, 若匹配成功, 则执行handler code

(5)select!文档开篇就对其有明确的定义,等待所有branch并发执行, 当第一个branch完成时,取消剩余branch async expression的执行!这就产生一个问题,如果你的async expression磨磨唧唧block/long running在那,不及时执行完毕返回,一旦其他branch首先执行完毕返回, 则select!首先模式匹配之, 一旦成功, 则本轮其他未执行完毕的async expression则被取消,最终导致这个branch一直不会成功,就像不存在!所以timeout那样持续性的future不适合用select!检测,selecct!拒绝他!还有async expression和handler code必须是那种即刻执行完毕返回的代码块,不可以sleep/delay/timeout/wait some thing/long runing等等, 因为他会剥夺select!检查其他branch的机会!

(6)切记区分“并发”和“并行”的不同!select!只是“并发”执行branch,并非"并行".

 

3.使用模式


loop {
    //每轮loop遍历重新评估每一个branch 所以一个branch不会一直disable.
    tokio::select! {
        <pattern> = <async expression> (, if <precondition>)? => {
            //handler code 
            //the pattern failed or the precondition return false , then the branch 被认为disable branch
        },
        //...
         else => {println!("not match");},
        //当所有branch都是disable branch时, select!去执行else branch, 若是没有else branch , 则panic.
    }
}  

precondition 若为false, 则disable 此branch case, but async expression is still evaluated, but the resulting future is not polled.大意为:只是评估async expression得出一个future, 但是不会真正去执行这个future. precondition 若为true, 则正常run 此future.

pattern 用于匹配async expression.await的执行结果.

async expression 一般代表一个可以后缀.await来实际执行的代码块,如async fn/block等.

4.select! 完整执行流程

(1)Evaluate all provided precondition expressions. If the precondition returns false, disable the branch for the remainder of the current call to select!. Re-entering select! due to a loop clears the "disabled" state. (2)Aggregate the async expressions from each branch, including the disabled ones. If the branch is disabled, async expression is still evaluated, but the resulting future is not polled. (3)Concurrently await on the results for all remaining async expressions. (4)Once an async expression returns a value, attempt to apply the value to the provided pattern, if the pattern matches, evaluate handler and return. If the pattern does not match, disable the current branch and for the remainder of the current call to select!. Continue from step 3。 (5)If all branches are disabled, evaluate the else expression. If none is provided, panic.

详情请参看: https://docs.rs/tokio/0.2.13/tokio/macro.select.html

 

5.code example


use tokio::time::{self, Duration,delay_for,timeout};
use tokio::stream::{self, StreamExt};
use tokio::sync::{oneshot,mpsc,broadcast};
use tokio::task;
​
async fn some_computation(input: u32) -> String {
    format!("the result of computation {}", input)
}
​
async fn some_async_work() {
    // do work
    //delay_for(Duration::from_millis(100000)).await;
    //只需注释掉上一行代码,并追加一行无线循环代码, 即可验证select!在当前同一个task所在的thread中并发执行
    //所有<async expression>, 一旦当前thread被block住,则select!不能再check其他branch的<async expression>了
    //所以切记<async expression>中不要有block当前线程的代码!
}
​
#[tokio::main]
async fn main() {
    //time::delay
    let mut delay = time::delay_for(Duration::from_millis(5));
    //stream
    let mut stream1 = stream::iter(vec![1, 2, 3]);
    //oneshot
    let (tx1, mut rx1) = oneshot::channel();
    tokio::spawn(async move {
        tx1.send("first").unwrap();
    });
    let mut a = None;
    //mpsc
    let (mut tx2, mut rx2) = mpsc::channel(100);
    tokio::spawn(async move {
        for i in 0..10 {
            let res = some_computation(i).await;
            tx2.send(res).await.unwrap();
        }
    });
    let mut done = false;
    //broadcast 
    let (tx3, mut rx3) = broadcast::channel(16);
    let mut rx4 = tx3.subscribe();
    tx3.send(10).unwrap();
    tx3.send(20).unwrap();
    tokio::spawn(async move {
        assert_eq!(rx4.recv().await.unwrap(), 10);
        assert_eq!(rx4.recv().await.unwrap(), 20);
    });
    //time::interval
    let mut interval = time::interval(Duration::from_millis(2));
    //join handle
    let mut join_done = false;
    let mut join_handle: task::JoinHandle<u8> = task::spawn(async {
        // some work here
        delay_for(Duration::from_millis(1)).await;
        88
    });
    //time::timeout
    //let mut to = timeout(Duration::from_millis(5), some_async_work());
​
    loop {
        tokio::select! {
            _ = &mut delay => {
                println!("delay reached");
                break;
            },
           /* _ = &mut to => {
                println!("operation timed out");
                break;
            },*/
            ret_code=&mut join_handle ,if !join_done => {
                join_done = true;
                println!("join handle case: {:?}", ret_code);
            },
            _= interval.tick() => {
                println!("operation interval");
            },
            _ = some_async_work() => {
                println!("operation completed");
                //delay_for(Duration::from_millis(100000)).await;
                //此处加上delay_for可用于验证, <handler code>一旦有block/long running 当前所在task的代码
                //则select!无法再去check其他branch了, 所以切记避免之!!!
            },
            Some(v) = stream1.next() => { println!("stream: {}", v);},
            v1 = (&mut rx1), if a.is_none()  =>  {
                println!("oneshot : {:?}", v1);a = v1.ok();
            },
            v2 = rx2.recv(), if !done  => {
                println!("mpsc: {:?}", v2);
                 if v2.is_none() { done = true; }
            },
            v3 = rx3.recv() => {
                println!("broadcast: {:?}", v3);
            },
            else => {
                println!("not match");
            },
        }
    }
}  

 

tokio::select!和golang::select还是有很大不同的, 后者主要监控channel, 而前者用于监控async expression,不限于channel!也不是监控channel的可读可写状态!不同于一般意义上的io event poll, 本质上讲select!就是每次同时并发执行所有enabel branch的async expression.await,一旦其中某个有结果,则接着对结果执行模式匹配, 成功了则执行handler code.

现在tokio::select!的编译报错相当不友好,一处出错整体泛红,令人无从下手。个人体会其可用性易用性和友好性远不如golang::select,确实需要打磨。

 

https://docs.rs/tokio/0.2.13/tokio/macro.select.html

 

5.后记

tokio::select!和golang::select还是有很大不同的, 后者主要监控channel, 而前者用于监控async expression,不限于channel!也不是监控channel的可读可写状态!不同于一般意义上的io event poll, 本质上讲select!就是每次同时并发执行所有enabel branch的async expression.await,一旦其中某个有结果,则接着对结果执行模式匹配, 成功了则执行handler code.

现在tokio::select!的编译报错相当不友好,一处出错整体泛红,令人无从下手。个人体会其可用性易用性和友好性远不如golang::select,确实需要打磨。

6.疑问

(1) tokio::select!只是描述接受async expression, 但是实验发现并非所有的async expression都被接受,比如:tokio::time::timeout, 我是在rust stable 1.42版本测试的, 有时间我在慢慢研究吧。

(2)tokio::select!对于每一个branch case, 其实主要检测async expression.await是否执行返回, 那么对于channel 而言,容易检测已读已写! 对于可读可写, 固然可以通过检查channel的len(), is_full(), is_empty()来判断, 但是当handler code被执行时,之前的判断很可能已经不成立!产生race condition问题,不知是否送多虑了???

(3) 虽然其文档中明确描述select!随机挑选一个check, 但参看其文档中的执行流程,分明是先来先得, 即那个async expression先执行完毕返回,select!就先check它, 从实际测试代码的执行输出来看也体现如此!比较扎堆, async expression的并发随机执行由tokio::runtime::executor来保证, 但是select!文档却说随机挑选一个branch进行check ! 故此对其随机性和公平性我却有些疑惑了!

7.参考资料

https://docs.rs/tokio/0.2.13/tokio/macro.select.html

8.test code: https://github.com/yujinliang/rust_learn/tree/master/tokio/select_macro

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Rust tokio::select学习杂记 的相关文章

随机推荐

  • PyQt6: 多网卡适配器的选择与显示(GPT4帮写)

    PyQt6 多网卡适配器的选择与显示 1 背景2 Python获取本机网卡适配器信息3 PyQT6 UI显示网卡信息4 PyQT6 后台处理 xff1a ButtonComboBox 附 xff1a GPT Output xff1a 博主热
  • Pyqt5的安装(Visual Studio Code)

    Pyqt5的安装 xff08 Visual Studio Code xff09 我的第一个博客就拿我现在正在准备的比赛来写吧 xff01 首先 xff0c 我们得安装一个 xff0c Visual Studio Code 安装的网址如下 x
  • [ WARN:0] global C:\projects\opencv-python\opencv\modules\videoio\src\cap_msmf.cpp (674) SourceReade

    capture 61 cv2 VideoCapture 0 出现警告 xff1a WARN 0 global C projects opencv python opencv modules videoio src cap msmf cpp
  • 租用游艇问题(动态规划)

    问题描述 xff1a 长江游艇俱乐部在长江上设置了n个游艇出租战1 xff0c 2 xff0c xff0c n 游客可以在这些游艇出租站租用游艇 xff0c 并在下游的任何一个游艇出租站归还游艇 游艇出租站 i 到游艇出租站 j 之间的租金
  • 对照JAVA学习Rust(07)--类和结构体

    1 Java类和Rust结构体 对象体结构 xff0c Java的类定义可包括属性和函数 xff0c 或常量 而Rust 结构体里只有属性字段 xff0c 函数需要在关联到架构体使用impl 结构体名如以下例子 JavaRust publi
  • HDU 3700 Cat

    Cat Time Limit 2000 1000 MS Java Others Memory Limit 32768 32768 K Java Others Total Submission s 451 Accepted Submissio
  • 解决supervisorctl引起的java进程oom一启动就被killed

    使用 supervisor 管理进程 当发现 某一个java进程 已启动就被killed 一直以为是程序哪里或者docker的内存有问题 原来是supervisor的一个命令 我的启动 java jar 放在了 某个 bash脚本中 sup
  • 以太网的帧间隙、前导码、帧开始定界符

    每个以太帧之间都要有帧间隙 xff08 Interframe Gap xff09 xff0c 即每发完一个帧后要等待一段时间才能再发 另外一个帧 xff0c 以便让帧接收者对接收的帧作必要的处理 xff08 如调整缓存的指针 更新计数 通知
  • Error: L6218E: Undefined symbol LED_Init (referred from main.o).

    在使用Keil软件的过程中 xff0c 经常会出现这种报错 xff0c undefined symbol xxxx referred from xxx o 这个时候大多是 c文件没有被包含在相应的路径中 xff0c o文件在我们的工程中没有
  • 让老照片重现光彩:Bringing Old Photos Back to Life(实战)

    Bringing Old Photos Back to Life 香港城市大学和微软亚洲研究院的 让老照片重现光彩 xff08 Bringing Old Photos Back to Life xff09 项目侧重于对老照片进行划痕修复和人
  • GitLab配置ssh key:gitlab add an ssh key

    一 检查 创建SSH Key 在用户主目录下 xff0c 看看有没有 ssh目录 xff0c 如果有 xff0c 再看看这个目录下有没有id rsa和id rsa pub这两个文件 xff0c 如果已经有了 xff0c 可直接跳到下一步 如
  • 请求头(request headers)和响应头(response headers)解析

    请求头 xff08 request headers xff09 POST user signin HTTP 1 1 请求方式 文件名 http版本号 Host passport cnblogs com 请求地址 Connection kee
  • Tableau基础操作——连接数据源

    Tableau基础操作 连接数据源 Tableau基础操作 连接数据源 前言 随着大数据时代的到来 xff0c 借助于数据分析工具深入分析并可视化呈现变得越来越重要 而Tableau以其低功能强大且学习成本低被越来越多的企业所使用 一 Ta
  • linux下休眠/待机命令

    if you cat sys power state mem disk you can echo mem gt sys power state 这相当于待机 echo disk gt sys power state 这相当于休眠 from
  • 从零开始离线安装k8s集群

    本文主要用于在内网 xff08 离线 xff09 环境安装k8s集群 xff1b linux环境 centos7 6 主要步骤有 xff1a 安装docker创建dokcer 私有镜像库 registry安装kubernetes安装flan
  • 虚拟机中的Linux系统无法识别U盘

    问题描述 xff1a 将U盘插入到电脑USB接口 xff0c 然后在虚拟机的右下角选择让U盘从Windows上断开 xff0c 链接到虚拟机上来 链接上虚拟机后 xff0c 在Linux系统中输入命令fdisk l命令 xff0c 却只有
  • C语言丨关键字enum用法详解,看这篇就够了

    一 关键字enum的定义 enum是C语言中的一个关键字 xff0c enum叫枚举数据类型 xff0c 枚举数据类型描述的是一组整型值的集合 xff08 这句话其实不太妥当 xff09 xff0c 枚举型是预处理指令 define的替代
  • CCF CSP 201512-3 画图

    字符串基础题 问题描述 用 ASCII 字符来画图是一件有趣的事情 xff0c 并形成了一门被称为 ASCII Art 的艺术 例如 xff0c 下图是用 ASCII 字符画出来的 CSPRO 字样 lt 本题要求编程实现一个用 ASCII
  • fails sanity check错误的解决方法

    fails sanity check的解决方法 问题原因 xff1a 编译器缺乏必要的package xff1a 解决办法 xff1a 运行yum install glibc headers gcc c 43 43 即可解决
  • Rust tokio::select学习杂记

    Rust tokio select学习杂记 前言 Linux系统有select poll epoll等 xff0c 主要用于监控各种fd上发生的各种event 从而识别派发处理 golang语言中也有一个select xff0c 作用相似