如何使用 mpsc 通道在线程之间创建环形通信?

2024-04-10

我想生成 n 个能够与环形拓扑中的其他线程通信的线程,例如线程 0 可以向线程 1 发送消息,线程 1 可以向线程 2 发送消息,等等,线程 n 可以向线程 0 发送消息。

这是我想用 n=3 实现的示例:

use std::sync::mpsc::{self, Receiver, Sender};
use std::thread;

let (tx0, rx0): (Sender<i32>, Receiver<i32>) = mpsc::channel();
let (tx1, rx1): (Sender<i32>, Receiver<i32>) = mpsc::channel();
let (tx2, rx2): (Sender<i32>, Receiver<i32>) = mpsc::channel();

let child0 = thread::spawn(move || {
    tx0.send(0).unwrap();
    println!("thread 0 sent: 0");
    println!("thread 0 recv: {:?}", rx2.recv().unwrap());
});
let child1 = thread::spawn(move || {
    tx1.send(1).unwrap();
    println!("thread 1 sent: 1");
    println!("thread 1 recv: {:?}", rx0.recv().unwrap());
});
let child2 = thread::spawn(move || {
    tx2.send(2).unwrap();
    println!("thread 2 sent: 2");
    println!("thread 2 recv: {:?}", rx1.recv().unwrap());
});

child0.join();
child1.join();
child2.join();

在这里,我在循环中创建通道,将它们存储在向量中,对发送者重新排序,将它们存储在新向量中,然后生成每个线程,每个线程都有自己的发送者-接收者(tx1/rx0、tx2/rx1 等)对。

const NTHREADS: usize = 8;

// create n channels
let channels: Vec<(Sender<i32>, Receiver<i32>)> =
    (0..NTHREADS).into_iter().map(|_| mpsc::channel()).collect();

// switch tupel entries for the senders to create ring topology
let mut channels_ring: Vec<(Sender<i32>, Receiver<i32>)> = (0..NTHREADS)
    .into_iter()
    .map(|i| {
        (
            channels[if i < channels.len() - 1 { i + 1 } else { 0 }].0,
            channels[i].1,
        )
    })
    .collect();

let mut children = Vec::new();
for i in 0..NTHREADS {
    let (tx, rx) = channels_ring.remove(i);

    let child = thread::spawn(move || {
        tx.send(i).unwrap();
        println!("thread {} sent: {}", i, i);
        println!("thread {} recv: {:?}", i, rx.recv().unwrap());
    });

    children.push(child);
}

for child in children {
    let _ = child.join();
}

这不起作用,因为无法复制 Sender 来创建新向量。 但是,如果我使用 refs (&发件人):

let mut channels_ring: Vec<(&Sender<i32>, Receiver<i32>)> = (0..NTHREADS)
    .into_iter()
    .map(|i| {
        (
            &channels[if i < channels.len() - 1 { i + 1 } else { 0 }].0,
            channels[i].1,
        )
    })
    .collect();

我无法生成线程,因为std::sync::mpsc::Sender<i32>不能在线程之间安全地共享。


Senders and Receivers 无法共享,因此您需要move他们进入各自的线程。这意味着将它们从Vec否则消耗Vec迭代时 - 即使作为中间步骤,向量也不允许处于无效状态(有孔)。迭代向量into_iter将通过消耗它们来实现这一点。

可以使用一个小技巧让发送者和接收者在循环中配对,即创建两个向量;发送者之一和接收者之一;然后旋转一个,以便每个向量中的相同索引将为您提供所需的对。

use std::sync::mpsc::{self, Receiver, Sender};
use std::thread;

fn main() {
    const NTHREADS: usize = 8;

    // create n channels
    let (mut senders, receivers): (Vec<Sender<i32>>, Vec<Receiver<i32>>) =
        (0..NTHREADS).into_iter().map(|_| mpsc::channel()).unzip();

    // move the first sender to the back
    senders.rotate_left(1);

    let children: Vec<_> = senders
        .into_iter()
        .zip(receivers.into_iter())
        .enumerate()
        .map(|(i, (tx, rx))| {
            thread::spawn(move || {
                tx.send(i as i32).unwrap();
                println!("thread {} sent: {}", i, i);
                println!("thread {} recv: {:?}", i, rx.recv().unwrap());
            })
        })
        .collect();

    for child in children {
        let _ = child.join();
    }
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何使用 mpsc 通道在线程之间创建环形通信? 的相关文章

  • 当应用程序终止时,我可以安全地依赖 Threads 中的 IsBackground 吗?

    我正在 GUI 中运行一些后台线程 目前 我正在实现个人线程取消代码 但线程中有 IsBackground 属性 根据 MSDN 它们会自行取消 我知道它将进入 Thread Abort 这很令人讨厌 但是在这个后台线程中没有任何事情需要我
  • 等待 .NET 线程启动的正确方法是什么?

    我在微软网站上的线程教程中阅读了以下内容 http msdn microsoft com en us library aa645740 v vs 71 aspx http msdn microsoft com en us library a
  • 如何将动态格式字符串与格式一起使用!宏?

    我想使用format 宏与String作为第一个参数 但因为宏需要字符串文字 所以我无法传递任何与它不同的内容 我想这样做是为了将字符串动态添加到当前字符串中 以便在视图引擎中使用 如果有更好的方法 我愿意接受建议 let test Str
  • 可变借用不止一次[重复]

    这个问题在这里已经有答案了 这是无法编译的简短示例的简短示例 错误在于add1功能 如果我这样做的话它会起作用add2 但这不是很干 有更多经验的人能否启发我如何以比以前更好的方式克服可变借用错误 add2 struct S1 full b
  • C# 的空条件委托调用线程安全吗? [复制]

    这个问题在这里已经有答案了 这就是我一直以来编写事件引发者的方式 例如属性更改 public event PropertyChangedEventHandler PropertyChanged private void RaisePrope
  • 使用互斥锁来阻止临界区外部的执行

    我不确定我的术语是否正确 但这里是 我有一个由多个线程使用的函数来写入数据 在注释中使用伪代码来说明我想要的内容 these are initiated in the constructor int data std atomic
  • 在多线程环境中,Collections.sort 方法有时会抛出 ConcurrentModificationException。列表没有进行结构性修改

    package CollectionsTS import java util ArrayList import java util Collections import java util HashSet import java util
  • 有没有办法使用现有的结构作为枚举变体?

    我使用枚举来实现多态性 类似于以下内容 enum MyType Variant1 a i32 b i32 Variant2 a bool b bool 有没有干净的方法来使用现有的结构Variant1 and Variant2 我做了以下事
  • 如何与超级请求处理程序共享不可变的配置数据?

    我正在尝试用 Rust 开发一个基于超级的服务器应用程序 有一个 INI 文件保存诸如绑定 IP 数据库等配置 我不想在每个请求上解析 INI 文件 并且可以保留配置数据直到服务器重新启动 如何向请求处理程序提供已解析数据的结构 我尝试过几
  • 线程自身连接

    我很怀疑 当线程加入自身时会发生什么 即线程自行调用 join 方法 我没有收到任何错误 样本 public class JoinItself extends Thread public void run System out printl
  • 线程池,C++

    我正在使用 C 开发一个网络程序 我想实现一个 pthread 池 每当我从接收套接字接收到一个事件时 我都会将数据放入线程池中的队列中 我正在考虑创建 5 个独立的线程 并将持续检查队列以查看是否有任何传入数据需要完成 这是一个非常简单的
  • C#中为线程指定特殊的cpu

    我有 2 个线程 我想告诉其中一个在第一个 cpu 上运行 第二个在第二个 cpu 上运行 例如在具有两个 cpu 的机器中 我怎样才能做到这一点 这是我的代码 UCI UCIMain new UCI Thread UCIThread ne
  • 创建具有特定权限的线程C++

    我有一个多线程应用程序 我想创建一个具有不同用户权限的线程 例如 多域管理员权限 但我找不到任何 Win32 APICreateThread要做到这一点 如何创建具有特定用户权限的线程 thanks 调用 CreateThread CREA
  • 如何在 Rust 中为引用创建“Iterable”特征?

    我正在尝试创造一种特质来捕捉iter函数于slice也VecDeque BTreeMap and HashMap 我希望这个特征的实现者能够指定和实现他们自己的迭代器类型 但看起来这个迭代器类型必须有一个生命周期参数 并且不能作为关联类型给
  • 如何通过start-stop-daemon正常关闭Spring Boot应用程序[重复]

    这个问题在这里已经有答案了 我们有一个多线程 Spring Boot 应用程序 它作为守护进程在 Linux 机器上运行 当我尝试像这样通过启动停止守护进程停止应用程序时 start stop daemon stop quiet retry
  • 如何使用 wait() 和 notification() 正确暂停线程

    我想要一个启动线程并提供暂停和继续该线程的方法的类 我的第一个方法是使用标志 只要该值为 true 它就会循环 sleep 方法 就像是 public class Bot private Thread t private boolean i
  • 断点会停止所有线程吗?

    如果我的程序中有两个线程同时运行 并在其中一个线程上设置了断点 那么当遇到此断点时 另一个线程也会停止 还是会继续执行 我用 Java 编写并使用 NetBeans 断点可以选择它们的行为方式 挂起单个线程或所有线程
  • 如何限制Erlang VM(BEAM)使用的核心数量?

    我正在具有 2 个四核 Xeon E5520 2 2GHz 24 0GB RAM 和 Erlang R15B02 启用 SMP 的节点上运行实验 我想知道是否可以限制Erlang VM使用的核心数量 以便我可以暂时禁用一些核心并逐步增加数量
  • 如何使用 runOnUiThread 而不出现“无法对非静态方法进行静态引用”编译器错误

    我有一个主课 ClientPlayer extends Activity 和一项服务 LotteryServer extends Service implements Runnable 当尝试在此服务的 run 方法中使用 RunOnUiT
  • 终结器线程的范围是什么 - 每个应用程序域或每个进程?

    根据我的所有阅读 应该有一个 GC 线程来调用所有终结器 现在的问题是这个 一个 线程的范围是什么 每个进程或每个应用程序域 因为域的整体目的是在一个进程空间中分离并创建 独立 的不同应用程序 I read here http dn cod

随机推荐

  • JavaScript 排序函数如何工作(作为一种算法)? [复制]

    这个问题在这里已经有答案了 JavaScript带参数的排序函数 http www javascriptkit com javatutors arraysort shtml允许传入一个函数 例如 var myarray 25 8 7 41
  • 如何在 Lucene 6 中对数字字段进行排序

    我想根据数字字段对搜索结果进行排序 在下面的示例代码中 我想根据 年龄 字段进行排序 我从使用以下答案开始 如何在 Lucene 6 中对 IntPont 或 LongPoint 字段进行排序 https stackoverflow com
  • 修改framework.jar中的java代码

    我手机上的库存 ROM 与 MVNO 移动虚拟网络运营商 存在问题 基本上这意味着我的数据连接仅在漫游时有效 这是一个已知问题 已在多个 ROM 上得到修复 但我的尚未修复 为了解决这个问题 我想修改framework jar文件的源 sy
  • 如何在 Xcode 5 中重命名项目?

    如何在 Xcode 5 中重命名项目 我需要采取什么步骤 在过去 这始终是一个非常棘手的手动过程 嗯 答案在 Xcode 5 中非常非常简单 在左侧的项目导航器中 缓慢单击 2 次 项目文件名将可编辑 输入新名称 将出现一个带有警告的表 并
  • maven zip uber-jar 和 shell 脚本

    我希望 Maven 能够将由 shade plugin 创建的 uber jar 和 all files 目录中的 shell 脚本结合起来 项目结构如下所示 all files mvn script sh projB shaded jar
  • 在MySql中加入DELETE?如何?

    我有这个 query DELETE FROM classified sql table WHERE classified ad id id AND classified classified id sql table classified
  • 如何在 SQLAlchemy 和 Firebird 中将 Python 列表绑定为自定义查询中的参数?

    环境 我使用 Firebird 数据库和 SQLAlchemy 作为 ORM 包装器 背景 我知道通过使用in 可以通过sales id列出在IN子句并得到结果 我有一个必须使用的用例文本sql Question 这是我的片段 conn e
  • 如何解决 macOS 中的 zsh: command not found: flutterfire 问题?

    我跟着document https firebase flutter dev docs overview using the flutterfire cli在我的 flutter 应用程序中添加 firebase 我收到 1 条警告和消息
  • 悬停时突出显示自定义 QWidgetAction

    我的应用程序有一个QMenuBar与一些QMenus 其中每个都有多个QActions 和子QMenus 大部分的QAction 项目是衍生品QWidgetAction并重新实施QWidgetAction createWidget方法 通常
  • 在 select 语句中使用命名元组

    有没有更好的方法使用 var 目标变量在 C 7 中选择命名元组 我一定是在示例 1 中做错了什么 或者完全误解了某些内容 我似乎必须显式设置目标类型才能执行此操作 1 Fails to compile with incorrect num
  • 如何获取订单密钥以在 WooCommerce 中创建自定义订单返回 url

    这是我用来获取自定义订单返回 URL 的代码 global woocommerce test order new WC Order order id test order key test order gt order key return
  • Scipy 对数范数拟合直方图

    我正在将对数正态 pdf 拟合到一些分箱数据 但我的曲线与数据不太匹配 请参见下图 我的代码是 import numpy as np import matplotlib pyplot as plt from scipy stats impo
  • 我可以在 Microchip C18 中创建一个同时接受 ram 和 rom 指针的函数吗?

    当我声明一个接受的函数时const char 我传递了一个字符串文字 我得到了一个 警告 2066 赋值中的类型限定符不匹配 因为字符串文字是rom const char 反过来也是一样的 虽然PIC是哈佛架构 但内存被映射到一个连续的地址
  • 如何将 Enum 绑定到 bit 或 int 的 DbType?

    我正在使用 Linq2Sql 并希望将对象字段 枚举 绑定到数据库中的 bit 或 int 类型 例如 我想在我的模型中有一个性别字段 我已经编辑了 DBML 并将类型更改为指向我的枚举 我想使用相同的想法为性别创建单选按钮 我想我已经弄清
  • 在 logback 中记录并行线程

    我将尝试对我的 Selenium 框架进行简要描述 以便我可以解释我的问题 I use 硒2 当前版本2 3 1 测试NG 5 14 我设置 testng xml 文件来并行运行测试套件中的测试 只有 2 个实例 出于记录目的 我使用log
  • Chrome 中忽略 window.open 宽度

    我想创建一个固定大小的弹出窗口 但是在 Chrome 中宽度属性被忽略 但在 FF 中工作正常 这是我的代码 window open width 300 生成的弹出窗口大于给定的宽度 有什么建议么 这似乎是 Chrome 中的一些奇怪的怪癖
  • 如何从 unicode 字符串中获取正确的元素?

    我想使用索引从 unicode 字符串中获取特定字母 但是 它并没有按预期工作 Example var handwriting 1234567890 var normal abcdefghijklmnopqrstuvwxyzABCDEFGH
  • 在shiny R中输入向量然后使用它

    在 Shiny R 中 我想要一种简单的方法来将向量作为 ui R 中的用户输入 然后想在 server R 中的函数中使用它 我是闪亮的新手 请帮忙 这里有一些简单的方法可以帮助您入门 祝您好运 请记住 下次发布一些代码 否则你肯定会被否
  • 创建像 ASP.NET MVC 3 ViewBag 这样的类?

    我有一种情况 我想做一些类似于 ASP NET MVC 3 ViewBag 对象所做的事情 其中 属性是在运行时创建的 或者是在编译时 无论如何 我想知道如何创建具有这种行为的对象 我创建了这样的东西 public class MyBag
  • 如何使用 mpsc 通道在线程之间创建环形通信?

    我想生成 n 个能够与环形拓扑中的其他线程通信的线程 例如线程 0 可以向线程 1 发送消息 线程 1 可以向线程 2 发送消息 等等 线程 n 可以向线程 0 发送消息 这是我想用 n 3 实现的示例 use std sync mpsc