如何使用 mpsc 通道在线程之间创建环形通信?

2024-04-10

我想生成 n 个能够与环形拓扑中的其他线程通信的线程,例如线程 0 可以向线程 1 发送消息,线程 1 可以向线程 2 发送消息,等等,线程 n 可以向线程 0 发送消息。

这是我想用 n=3 实现的示例:

use std::sync::mpsc::{self, Receiver, Sender};
use std::thread;

let (tx0, rx0): (Sender<i32>, Receiver<i32>) = mpsc::channel();
let (tx1, rx1): (Sender<i32>, Receiver<i32>) = mpsc::channel();
let (tx2, rx2): (Sender<i32>, Receiver<i32>) = mpsc::channel();

let child0 = thread::spawn(move || {
    tx0.send(0).unwrap();
    println!("thread 0 sent: 0");
    println!("thread 0 recv: {:?}", rx2.recv().unwrap());
});
let child1 = thread::spawn(move || {
    tx1.send(1).unwrap();
    println!("thread 1 sent: 1");
    println!("thread 1 recv: {:?}", rx0.recv().unwrap());
});
let child2 = thread::spawn(move || {
    tx2.send(2).unwrap();
    println!("thread 2 sent: 2");
    println!("thread 2 recv: {:?}", rx1.recv().unwrap());
});

child0.join();
child1.join();
child2.join();

在这里,我在循环中创建通道,将它们存储在向量中,对发送者重新排序,将它们存储在新向量中,然后生成每个线程,每个线程都有自己的发送者-接收者(tx1/rx0、tx2/rx1 等)对。

const NTHREADS: usize = 8;

// create n channels
let channels: Vec<(Sender<i32>, Receiver<i32>)> =
    (0..NTHREADS).into_iter().map(|_| mpsc::channel()).collect();

// switch tupel entries for the senders to create ring topology
let mut channels_ring: Vec<(Sender<i32>, Receiver<i32>)> = (0..NTHREADS)
    .into_iter()
    .map(|i| {
        (
            channels[if i < channels.len() - 1 { i + 1 } else { 0 }].0,
            channels[i].1,
        )
    })
    .collect();

let mut children = Vec::new();
for i in 0..NTHREADS {
    let (tx, rx) = channels_ring.remove(i);

    let child = thread::spawn(move || {
        tx.send(i).unwrap();
        println!("thread {} sent: {}", i, i);
        println!("thread {} recv: {:?}", i, rx.recv().unwrap());
    });

    children.push(child);
}

for child in children {
    let _ = child.join();
}

这不起作用,因为无法复制 Sender 来创建新向量。 但是,如果我使用 refs (&发件人):

let mut channels_ring: Vec<(&Sender<i32>, Receiver<i32>)> = (0..NTHREADS)
    .into_iter()
    .map(|i| {
        (
            &channels[if i < channels.len() - 1 { i + 1 } else { 0 }].0,
            channels[i].1,
        )
    })
    .collect();

我无法生成线程,因为std::sync::mpsc::Sender<i32>不能在线程之间安全地共享。


Senders and Receivers 无法共享,因此您需要move他们进入各自的线程。这意味着将它们从Vec否则消耗Vec迭代时 - 即使作为中间步骤,向量也不允许处于无效状态(有孔)。迭代向量into_iter将通过消耗它们来实现这一点。

可以使用一个小技巧让发送者和接收者在循环中配对,即创建两个向量;发送者之一和接收者之一;然后旋转一个,以便每个向量中的相同索引将为您提供所需的对。

use std::sync::mpsc::{self, Receiver, Sender};
use std::thread;

fn main() {
    const NTHREADS: usize = 8;

    // create n channels
    let (mut senders, receivers): (Vec<Sender<i32>>, Vec<Receiver<i32>>) =
        (0..NTHREADS).into_iter().map(|_| mpsc::channel()).unzip();

    // move the first sender to the back
    senders.rotate_left(1);

    let children: Vec<_> = senders
        .into_iter()
        .zip(receivers.into_iter())
        .enumerate()
        .map(|(i, (tx, rx))| {
            thread::spawn(move || {
                tx.send(i as i32).unwrap();
                println!("thread {} sent: {}", i, i);
                println!("thread {} recv: {:?}", i, rx.recv().unwrap());
            })
        })
        .collect();

    for child in children {
        let _ = child.join();
    }
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何使用 mpsc 通道在线程之间创建环形通信? 的相关文章

  • 当函数中的模式匹配采用 &self 或 &mut self 时,如何避免使用 ref 关键字?

    铁锈书称为ref关键词 遗产 https doc rust lang org book ch18 03 pattern syntax html legacy patterns ref and ref mut 因为我想遵循隐含的建议来避免re
  • std::map 只读操作的线程安全

    我有一个 std map 用于将值 字段 ID 映射到人类可读的字符串 当我的程序在任何其他线程启动之前启动时 该映射会被初始化一次 之后就不会再被修改 现在 我为每个线程提供了这个 相当大的 映射的自己的副本 但这显然是内存使用效率低下
  • 返回一个dispatch_async获取的变量[重复]

    这个问题在这里已经有答案了 基本上 一个方法需要返回一个在dispatch async中获取的NSDictionary 这是我尝试过的 NSDictionary fetchNSDictionary dispatch queue t Queu
  • 我对线程失去了理智

    我想要这个类的对象 public class Chromosome implements Runnable Comparable
  • C++11 动态线程池

    最近 我一直在尝试寻找一个用于线程并发任务的库 理想情况下 是一个在线程上调用函数的简单接口 任何时候都有 n 个线程 有些线程比其他线程完成得更快 并且到达的时间不同 首先我尝试了 Rx 它在 C 中非常棒 我还研究了 Blocks 和
  • C#中为线程指定特殊的cpu

    我有 2 个线程 我想告诉其中一个在第一个 cpu 上运行 第二个在第二个 cpu 上运行 例如在具有两个 cpu 的机器中 我怎样才能做到这一点 这是我的代码 UCI UCIMain new UCI Thread UCIThread ne
  • 如何从 Vector 创建非消耗迭代器

    情况 我有一种情况 我想调用定义在Iterator函数参数的特征 我想调用它的函数接受一个类型的参数 该类型是trait called VecLike 该函数称为get all matching rules get all matching
  • 如何限制Erlang VM(BEAM)使用的核心数量?

    我正在具有 2 个四核 Xeon E5520 2 2GHz 24 0GB RAM 和 Erlang R15B02 启用 SMP 的节点上运行实验 我想知道是否可以限制Erlang VM使用的核心数量 以便我可以暂时禁用一些核心并逐步增加数量
  • “功能性”Rust 对性能有哪些影响?

    我正在关注 Rust 轨道运动 io https exercism io 我有相当多的 C C 经验 我喜欢 Rust 的 功能 元素 但我担心相对性能 我解决了 行程编码 问题 https exercism io tracks rust
  • Android 为什么这不会抛出错误的线程异常?

    我的印象是视图只能从主线程操作 但是 为什么这不会崩溃 public class MainActivity extends Activity TextView tv Override protected void onCreate Bund
  • 终结器线程的范围是什么 - 每个应用程序域或每个进程?

    根据我的所有阅读 应该有一个 GC 线程来调用所有终结器 现在的问题是这个 一个 线程的范围是什么 每个进程或每个应用程序域 因为域的整体目的是在一个进程空间中分离并创建 独立 的不同应用程序 I read here http dn cod
  • 调试 Java InterruptedException,即查找原因

    在调试Android应用程序时 有时中断异常发生并使应用程序崩溃 我已经能够在默认异常处理程序上设置断点 但调用堆栈不提供信息 at java util concurrent locks AbstractQueuedSynchronizer
  • 错误[E0554]:#![功能]可能无法在稳定发布通道上使用无法使用货物安装赛车

    我正在尝试使用 Cargo 安装 Racer 所以我执行了命令cargo install racer在终端中并导致错误 error E0554 feature may not be used on the stable release ch
  • rust-analyzer 未能发现 vscode 中的工作区

    我开始学习 Rust 语言 但有一个问题rust analyzervscode 扩展名 它不提供代码完成 如链接所示 https rust analyzer github io manual html magic completions h
  • PyQt5:如何使QThread返回数据到主线程

    I am a PyQt 5 4 1 1初学者 我的Python是3 4 3 这是我尝试遵循的many https mayaposch wordpress com 2011 11 01 how to really truly use qthr
  • Rust 为什么要费心“let”? [关闭]

    Closed 这个问题是基于意见的 help closed questions 目前不接受答案 我对 Rust 很感兴趣 所以我开始阅读 Rust 网站上的 Rust 编程指南 发现变量是通过以下方式声明的 let x i32 5 这意味着
  • 使用 Socket.io 向多个房间发送消息?

    是否可以使用socket io向多个房间发送消息 发送至 1 个房间 io sockets in room emit id 发送到N个房间 io sockets in room1 room2 roomN emit id 是的 可以同时发送到
  • 奇怪的跨线程 UI 错误

    我正在编写一个 WinForms 应用程序 它有两种模式 控制台或 GUI 同一解决方案中的三个项目 一个用于控制台应用程序 一个用于 UI 表单 第三个用于保存两个界面也将连接的逻辑 控制台应用程序运行绝对流畅 保存用户选择的模型 它有一
  • 如何下载 Rust API 文档?

    有没有办法可以下载 Rust 的 API 库文档或生成它们 我可以在 Rust 源中找到的唯一文档位于src docs https github com rust lang rust tree master src doc 我正在运行夜间构
  • 什么时候可以在 Java 中使用 Thead.stop() ?

    Thread stop 的 Java 文档听起来好像如果您调用 Thread stop 世界就会终结 已弃用 这种方法本质上是不安全的 停止线程 Thread stop 导致它解锁所有已锁定的监视器 作为未经检查的 ThreadDeath

随机推荐

  • JavaScript 排序函数如何工作(作为一种算法)? [复制]

    这个问题在这里已经有答案了 JavaScript带参数的排序函数 http www javascriptkit com javatutors arraysort shtml允许传入一个函数 例如 var myarray 25 8 7 41
  • 如何在 Lucene 6 中对数字字段进行排序

    我想根据数字字段对搜索结果进行排序 在下面的示例代码中 我想根据 年龄 字段进行排序 我从使用以下答案开始 如何在 Lucene 6 中对 IntPont 或 LongPoint 字段进行排序 https stackoverflow com
  • 修改framework.jar中的java代码

    我手机上的库存 ROM 与 MVNO 移动虚拟网络运营商 存在问题 基本上这意味着我的数据连接仅在漫游时有效 这是一个已知问题 已在多个 ROM 上得到修复 但我的尚未修复 为了解决这个问题 我想修改framework jar文件的源 sy
  • 如何在 Xcode 5 中重命名项目?

    如何在 Xcode 5 中重命名项目 我需要采取什么步骤 在过去 这始终是一个非常棘手的手动过程 嗯 答案在 Xcode 5 中非常非常简单 在左侧的项目导航器中 缓慢单击 2 次 项目文件名将可编辑 输入新名称 将出现一个带有警告的表 并
  • maven zip uber-jar 和 shell 脚本

    我希望 Maven 能够将由 shade plugin 创建的 uber jar 和 all files 目录中的 shell 脚本结合起来 项目结构如下所示 all files mvn script sh projB shaded jar
  • 在MySql中加入DELETE?如何?

    我有这个 query DELETE FROM classified sql table WHERE classified ad id id AND classified classified id sql table classified
  • 如何在 SQLAlchemy 和 Firebird 中将 Python 列表绑定为自定义查询中的参数?

    环境 我使用 Firebird 数据库和 SQLAlchemy 作为 ORM 包装器 背景 我知道通过使用in 可以通过sales id列出在IN子句并得到结果 我有一个必须使用的用例文本sql Question 这是我的片段 conn e
  • 如何解决 macOS 中的 zsh: command not found: flutterfire 问题?

    我跟着document https firebase flutter dev docs overview using the flutterfire cli在我的 flutter 应用程序中添加 firebase 我收到 1 条警告和消息
  • 悬停时突出显示自定义 QWidgetAction

    我的应用程序有一个QMenuBar与一些QMenus 其中每个都有多个QActions 和子QMenus 大部分的QAction 项目是衍生品QWidgetAction并重新实施QWidgetAction createWidget方法 通常
  • 在 select 语句中使用命名元组

    有没有更好的方法使用 var 目标变量在 C 7 中选择命名元组 我一定是在示例 1 中做错了什么 或者完全误解了某些内容 我似乎必须显式设置目标类型才能执行此操作 1 Fails to compile with incorrect num
  • 如何获取订单密钥以在 WooCommerce 中创建自定义订单返回 url

    这是我用来获取自定义订单返回 URL 的代码 global woocommerce test order new WC Order order id test order key test order gt order key return
  • Scipy 对数范数拟合直方图

    我正在将对数正态 pdf 拟合到一些分箱数据 但我的曲线与数据不太匹配 请参见下图 我的代码是 import numpy as np import matplotlib pyplot as plt from scipy stats impo
  • 我可以在 Microchip C18 中创建一个同时接受 ram 和 rom 指针的函数吗?

    当我声明一个接受的函数时const char 我传递了一个字符串文字 我得到了一个 警告 2066 赋值中的类型限定符不匹配 因为字符串文字是rom const char 反过来也是一样的 虽然PIC是哈佛架构 但内存被映射到一个连续的地址
  • 如何将 Enum 绑定到 bit 或 int 的 DbType?

    我正在使用 Linq2Sql 并希望将对象字段 枚举 绑定到数据库中的 bit 或 int 类型 例如 我想在我的模型中有一个性别字段 我已经编辑了 DBML 并将类型更改为指向我的枚举 我想使用相同的想法为性别创建单选按钮 我想我已经弄清
  • 在 logback 中记录并行线程

    我将尝试对我的 Selenium 框架进行简要描述 以便我可以解释我的问题 I use 硒2 当前版本2 3 1 测试NG 5 14 我设置 testng xml 文件来并行运行测试套件中的测试 只有 2 个实例 出于记录目的 我使用log
  • Chrome 中忽略 window.open 宽度

    我想创建一个固定大小的弹出窗口 但是在 Chrome 中宽度属性被忽略 但在 FF 中工作正常 这是我的代码 window open width 300 生成的弹出窗口大于给定的宽度 有什么建议么 这似乎是 Chrome 中的一些奇怪的怪癖
  • 如何从 unicode 字符串中获取正确的元素?

    我想使用索引从 unicode 字符串中获取特定字母 但是 它并没有按预期工作 Example var handwriting 1234567890 var normal abcdefghijklmnopqrstuvwxyzABCDEFGH
  • 在shiny R中输入向量然后使用它

    在 Shiny R 中 我想要一种简单的方法来将向量作为 ui R 中的用户输入 然后想在 server R 中的函数中使用它 我是闪亮的新手 请帮忙 这里有一些简单的方法可以帮助您入门 祝您好运 请记住 下次发布一些代码 否则你肯定会被否
  • 创建像 ASP.NET MVC 3 ViewBag 这样的类?

    我有一种情况 我想做一些类似于 ASP NET MVC 3 ViewBag 对象所做的事情 其中 属性是在运行时创建的 或者是在编译时 无论如何 我想知道如何创建具有这种行为的对象 我创建了这样的东西 public class MyBag
  • 如何使用 mpsc 通道在线程之间创建环形通信?

    我想生成 n 个能够与环形拓扑中的其他线程通信的线程 例如线程 0 可以向线程 1 发送消息 线程 1 可以向线程 2 发送消息 等等 线程 n 可以向线程 0 发送消息 这是我想用 n 3 实现的示例 use std sync mpsc