C++ 中具有双缓冲区的单生产者、单消费者数据结构

2024-01-04

我在 $work 有一个应用程序,我必须在以不同频率调度的两个实时线程之间移动。 (实际的调度超出了我的控制范围。)应用程序是硬实时的(其中一个线程必须驱动硬件接口),因此线程之间的数据传输应该是无锁和无等待的可能的程度。

需要注意的是,只需要传输一个数据块:因为两个线程以不同的速率运行,有时会在较慢线程的两次唤醒之间完成较快线程的两次迭代;在这种情况下,可以覆盖写入缓冲区中的数据,以便较慢的线程仅获取最新数据。

换句话说,双缓冲解决方案代替队列就足够了。这两个缓冲区是在初始化期间分配的,读取线程和写入线程可以调用该类的方法来获取指向这些缓冲区之一的指针。

C++代码:

#include <mutex>

template <typename T>
class ProducerConsumerDoubleBuffer {
public:
    ProducerConsumerDoubleBuffer() {
        m_write_busy = false;
        m_read_idx = m_write_idx = 0;
    }

    ~ProducerConsumerDoubleBuffer() { }

    // The writer thread using this class must call
    // start_writing() at the start of its iteration
    // before doing anything else to get the pointer
    // to the current write buffer.
    T * start_writing(void) {
        std::lock_guard<std::mutex> lock(m_mutex);

        m_write_busy = true;
        m_write_idx = 1 - m_read_idx;

        return &m_buf[m_write_idx];
    }
    // The writer thread must call end_writing()
    // as the last thing it does
    // to release the write busy flag.
    void end_writing(void) {
        std::lock_guard<std::mutex> lock(m_mutex);

        m_write_busy = false;
    }

    // The reader thread must call start_reading()
    // at the start of its iteration to get the pointer
    // to the current read buffer.
    // If the write thread is not active at this time,
    // the read buffer pointer will be set to the 
    // (previous) write buffer - so the reader gets the latest data.
    // If the write buffer is busy, the read pointer is not changed.
    // In this case the read buffer may contain stale data,
    // it is up to the user to deal with this case.
    T * start_reading(void) {
        std::lock_guard<std::mutex> lock(m_mutex);

        if (!m_write_busy) {
            m_read_idx = m_write_idx;
        }

        return &m_buf[m_read_idx];
    }
    // The reader thread must call end_reading()
    // at the end of its iteration.
    void end_reading(void) {
        std::lock_guard<std::mutex> lock(m_mutex);

        m_read_idx = m_write_idx;
    }

private:
    T m_buf[2];
    bool m_write_busy;
    unsigned int m_read_idx, m_write_idx;
    std::mutex m_mutex;
};

为了避免读取器线程中的陈旧数据,有效负载结构被版本化。 为了促进线程之间的双向数据传输,使用了上述怪物的两个实例,方向相反。

问题:

  • 这个方案线程安全吗?如果坏了,在哪里?
  • 没有互斥体可以完成吗?也许只有内存屏障或 CAS 指令?
  • 可以做得更好吗?

非常有趣的问题!比我最初想象的要棘手得多:-) 我喜欢无锁解决方案,因此我尝试在下面找到一种解决方案。

有很多方法可以思考这个系统。你可以建模 它作为一个固定大小的循环缓冲区/队列(有两个条目),但是然后 您失去了更新下一个可用消费值的能力, 因为你不知道消费者是否已经开始阅读最近发布的内容 值或仍在(可能)阅读前一个。所以额外的状态 为了达到更优化的效果,需要超出标准环形缓冲区的范围 解决方案。

首先请注意,总有一个生产者可以安全写入的单元格 在任何给定时间点;如果消费者正在读取一个单元格,则 其他的都可以写入。我们将可以安全写入的单元格称为 “活动”单元格(可以潜在读取的单元格是任何单元格isn't活跃的那个)。仅当其他小区不活跃时,才可以切换活动小区 当前正在读取。

与始终可以写入的活动单元不同,非活动单元可以 仅当它包含值时才被读取;一旦这个价值被消耗掉,它就消失了。 (这意味着在积极的生产者的情况下可以避免活锁;在某些情况下 此时,消费者将清空电池并将停止触摸电池。一次 发生这种情况时,生产者肯定可以发布一个值,而在此之前, 如果消费者不在其中,它只能发布一个值(更改活动单元格) 读到中间。)

如果有is一个可以被消费的值,只有消费者可以改变它 事实(无论如何,对于非活动单元);后续生产可能会改变哪个单元 处于活动状态且已发布值,但值将始终可供读取,直到 它被消耗了。

一旦生产者完成对活动单元的写入,它可以通过以下方式“发布”该值: 更改哪个单元格是活动单元格(交换索引),前提是消费者是 不在读取另一个单元格的过程中。如果消费者is在中间 读取另一个单元格时,交换不会发生,但在这种情况下,消费者可以交换 后it's完成读取值,前提是生产者不在中间 写入(如果是,生产者将在完成后进行交换)。 事实上,一般来说,消费者在读完之后总是可以交换(如果它是唯一的一个) 访问系统),因为消费者的虚假交换是良性的:如果存在 另一个单元格中的某些内容,然后交换将导致接下来读取该内容,并且如果 没有,交换没有任何影响。

因此,我们需要一个共享变量来跟踪活动单元格是什么,并且我们还需要一个 生产者和消费者都可以指示他们是否处于生产过程中的方式 手术。我们可以将这三个状态按顺序存储到一个原子变量中 能够同时(原子地)影响它们。 我们还需要一种方法让消费者检查里面是否有东西 首先是非活动单元,并且两个线程都可以修改该状态 作为适当的。我尝试了其他几种方法,但最终最简单的就是 也将此信息包含在另一个原子变量中。这让事情变得很多 推理起来更简单,因为系统中的所有状态更改都是原子的。

我想出了一个无等待的实现(无锁,所有操作完成 在有限数量的指令中)。

代码时间!

#include <atomic>
#include <cstdint>

template <typename T>
class ProducerConsumerDoubleBuffer {
public:
    ProducerConsumerDoubleBuffer() : m_state(0) { }
    ~ProducerConsumerDoubleBuffer() { }

    // Never returns nullptr
    T* start_writing() {
        // Increment active users; once we do this, no one
        // can swap the active cell on us until we're done
        auto state = m_state.fetch_add(0x2, std::memory_order_relaxed);
        return &m_buf[state & 1];
    }

    void end_writing() {
        // We want to swap the active cell, but only if we were the last
        // ones concurrently accessing the data (otherwise the consumer
        // will do it for us when *it's* done accessing the data)

        auto state = m_state.load(std::memory_order_relaxed);
        std::uint32_t flag = (8 << (state & 1)) ^ (state & (8 << (state & 1)));
        state = m_state.fetch_add(flag - 0x2, std::memory_order_release) + flag - 0x2;
        if ((state & 0x6) == 0) {
            // The consumer wasn't in the middle of a read, we should
            // swap (unless the consumer has since started a read or
            // already swapped or read a value and is about to swap).
            // If we swap, we also want to clear the full flag on what
            // will become the active cell, otherwise the consumer could
            // eventually read two values out of order (it reads a new
            // value, then swaps and reads the old value while the
            // producer is idle).
            m_state.compare_exchange_strong(state, (state ^ 0x1) & ~(0x10 >> (state & 1)), std::memory_order_release);
        }
    }

    // Returns nullptr if there appears to be no more data to read yet
    T* start_reading() {
        m_readState = m_state.load(std::memory_order_relaxed);
        if ((m_readState & (0x10 >> (m_readState & 1))) == 0) {
            // Nothing to read here!
            return nullptr;
        }

        // At this point, there is guaranteed to be something to
        // read, because the full flag is never turned off by the
        // producer thread once it's on; the only thing that could
        // happen is that the active cell changes, but that can
        // only happen after the producer wrote a value into it,
        // in which case there's still a value to read, just in a
        // different cell.

        m_readState = m_state.fetch_add(0x2, std::memory_order_acquire) + 0x2;

        // Now that we've incremented the user count, nobody can swap until
        // we decrement it
        return &m_buf[(m_readState & 1) ^ 1];
    }

    void end_reading() {
        if ((m_readState & (0x10 >> (m_readState & 1))) == 0) {
            // There was nothing to read; shame to repeat this
            // check, but if these functions are inlined it might
            // not matter. Otherwise the API could be changed.
            // Or just don't call this method if start_reading()
            // returns nullptr -- then you could also get rid
            // of m_readState.
            return;
        }

        // Alright, at this point the active cell cannot change on
        // us, but the active cell's flag could change and the user
        // count could change. We want to release our user count
        // and remove the flag on the value we read.

        auto state = m_state.load(std::memory_order_relaxed);
        std::uint32_t sub = (0x10 >> (state & 1)) | 0x2;
        state = m_state.fetch_sub(sub, std::memory_order_relaxed) - sub;
        if ((state & 0x6) == 0 && (state & (0x8 << (state & 1))) == 1) {
            // Oi, we were the last ones accessing the data when we released our cell.
            // That means we should swap, but only if the producer isn't in the middle
            // of producing something, and hasn't already swapped, and hasn't already
            // set the flag we just reset (which would mean they swapped an even number
            // of times).  Note that we don't bother swapping if there's nothing to read
            // in the other cell.
            m_state.compare_exchange_strong(state, state ^ 0x1, std::memory_order_relaxed);
        }
    }

private:
    T m_buf[2];

    // The bottom (lowest) bit will be the active cell (the one for writing).
    // The active cell can only be switched if there's at most one concurrent
    // user. The next two bits of state will be the number of concurrent users.
    // The fourth bit indicates if there's a value available for reading
    // in m_buf[0], and the fifth bit has the same meaning but for m_buf[1].
    std::atomic<std::uint32_t> m_state;

    std::uint32_t m_readState;
};

请注意,语义是消费者永远不能两次读取给定值, 并且它读取的值总是比它最后读取的值新。也还算可以 内存使用效率高(两个缓冲区,就像您原来的解决方案一样)。我避免了 CAS 循环 因为它们通常比争用的单个原子操作效率低。

如果您决定使用上面的代码,我建议您首先为其编写一些全面的(线程)单元测试。 和适当的基准。我确实测试过,但只是勉强测试。如果您发现任何错误,请告诉我:-)

我的单元测试:

ProducerConsumerDoubleBuffer<int> buf;
std::thread producer([&]() {
    for (int i = 0; i != 500000; ++i) {
        int* item = buf.start_writing();
        if (item != nullptr) {      // Always true
            *item = i;
        }
        buf.end_writing();
    }
});
std::thread consumer([&]() {
    int prev = -1;
    for (int i = 0; i != 500000; ++i) {
        int* item = buf.start_reading();
        if (item != nullptr) {
            assert(*item > prev);
            prev = *item;
        }
        buf.end_reading();
    }
});
producer.join();
consumer.join();

至于你原来的实现,我只是粗略地看了一下(这更有趣 设计新东西,呵呵),但 david.pfx 的答案似乎解决了你问题的这一部分。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

C++ 中具有双缓冲区的单生产者、单消费者数据结构 的相关文章

  • 进程何时获得 SIGABRT(信号 6)?

    C 中进程获得 SIGABRT 的场景有哪些 该信号是否始终来自进程内部 或者该信号可以从一个进程发送到另一个进程吗 有没有办法识别哪个进程正在发送该信号 abort 向调用进程发送SIGABRT信号 就是这样abort 基本上有效 abo
  • 我的线程图像生成应用程序如何将其数据传输到 GUI?

    Mandelbrot 生成器的缓慢多精度实现 线程化 使用 POSIX 线程 Gtk 图形用户界面 我有点失落了 这是我第一次尝试编写线程程序 我实际上并没有尝试转换它的单线程版本 只是尝试实现基本框架 到目前为止它是如何工作的简要描述 M
  • 为什么我不能用 `= delete;` 声明纯虚函数?

    Intro 纯虚函数使用通用语法声明 virtual f 0 然而 自 c 11 以来 有一种方法可以显式地传达non existence 特殊 成员函数的 Mystruct delete eg default constructor Q
  • 以编程方式检查页面是否需要基于 web.config 设置进行身份验证

    我想知道是否有一种方法可以检查页面是否需要基于 web config 设置进行身份验证 基本上如果有这样的节点
  • 如何创建可以像 UserControl 一样编辑的 TabPage 子类?

    我想创建一个包含一些控件的 TabPage 子类 并且我想通过设计器来控制这些控件的布局和属性 但是 如果我在设计器中打开子类 我将无法像在 UserControl 上那样定位它们 我不想创建一个带有 UserControl 实例的 Tab
  • 使用post方法将多个参数发送到asp.net core 3 mvc操作

    使用 http post 方法向 asp net mvc core 3 操作发送具有多个参数的 ajax 请求时存在问题 参数不绑定 在 dot net 框架 asp net web api 中存在类似的限制 但在 asp net mvc
  • C++:重写已弃用的虚拟方法时出现弃用警告

    我有一个纯虚拟类 它有一个纯虚拟方法 应该是const 但不幸的是不是 该接口位于库中 并且该类由单独项目中的其他几个类继承 我正在尝试使用这个方法const不会破坏兼容性 至少在一段时间内 但我找不到在非常量方法重载时产生警告的方法 以下
  • POCO HTTPSClientSession 发送请求时遇到问题 - 证书验证失败

    我正在尝试使用 POCO 库编写一个向服务器发出 HTTPS 请求的程序 出于测试目的 我正在连接到具有自签名证书的服务器 并且我希望允许客户端进行连接 为了允许这种情况发生 我尝试安装InvalidCertificateHandler这是
  • 暂停下载线程

    我正在用 C 编写一个非常简单的批量下载程序 该程序读取要下载的 URL 的 txt 文件 我已经设置了一个全局线程和委托来更新 GUI 按下 开始 按钮即可创建并启动该线程 我想要做的是有一个 暂停 按钮 使我能够暂停下载 直到点击 恢复
  • 如何从 C# 控制器重定向到外部 url

    我使用 C 控制器作为网络服务 在其中我想将用户重定向到外部网址 我该怎么做 Tried System Web HttpContext Current Response Redirect 但没有成功 使用控制器的重定向 http msdn
  • Qt 创建布局并动态添加小部件到布局

    我正在尝试在 MainWindow 类中动态创建布局 我有四个框架 它们是用网格布局对象放置的 每个框架都包含一个自定义的 ClockWidget 我希望 ClockWidget 对象在调整主窗口大小时相应地调整大小 因此我需要将它们添加到
  • 将数据打印到文件

    我已经超载了 lt lt 运算符 使其写入文件并写入控制台 我已经为同一个函数创建了 8 个线程 并且我想输出 hello hi 如果我在无限循环中运行这个线程例程 文件中的o p是 hello hi hello hi hello hi e
  • C# 中条件编译符号的编译时检查(参见示例)?

    在 C C 中你可以这样做 define IN USE 1 define NOT IN USE 1 define USING system 1 system 1 IN USE 进而 define MY SYSTEM IN USE if US
  • 将构建日期放入“关于”框中

    我有一个带有 关于 框的 C WinForms 应用程序 我使用以下方法将版本号放入 关于 框中 FileVersionInfo GetVersionInfo Assembly GetExecutingAssembly Location F
  • 当“int”处于最大值并使用 postfix ++ 进行测试时,代码定义良好吗?

    示例 未定义行为的一个示例是整数溢出的行为 C11dr 3 4 3 3 int溢出是未定义的行为 但这是否适用于存在循环的以下内容 并且不使用现在超出范围的副作用i 特别是 这是否后缀增量规格帮助 结果的值计算在副作用之前排序 更新操作数的
  • 如何一步步遍历目录树?

    我发现了很多关于遍历目录树的示例 但我需要一些不同的东西 我需要一个带有某种方法的类 每次调用都会从目录返回一个文件 并逐渐遍历目录树 请问我该怎么做 我正在使用函数 FindFirstFile FindNextFile 和 FindClo
  • 当前的 x86 架构是否支持非临时加载(来自“正常”内存)?

    我知道有关此主题的多个问题 但是 我没有看到任何明确的答案或任何基准测量 因此 我创建了一个处理两个整数数组的简单程序 第一个数组a非常大 64 MB 第二个数组b很小 无法放入 L1 缓存 程序迭代a并将其元素添加到相应的元素中b在模块化
  • 剪贴板在 .NET 3.5 和 4 中的行为有所不同,但为什么呢?

    我们最近将一个非常大的项目从 NET Framework 3 5 升级到 4 最初一切似乎都工作正常 但现在复制粘贴操作开始出现错误 我已经成功制作了一个小型的可复制应用程序 它显示了 NET 3 5 和 4 中的不同行为 我还找到了一种解
  • 什么是 __declspec 以及何时需要使用它?

    我见过这样的例子 declspec在我正在阅读的代码中 它是什么 我什么时候需要使用这个构造 这是 Microsoft 对 C 语言的特定扩展 它允许您使用存储类信息来赋予类型或函数属性 文档 declspec C https learn
  • 带重定向标准流的 C# + telnet 进程立即退出

    我正在尝试用 C 做一个 脚本化 telnet 项目 有点类似于Tcl期望 http expect nist gov 我需要为其启动 telnet 进程并重定向 和处理 其 stdin stdout 流 问题是 生成的 telnet 进程在

随机推荐

  • Knockoutjs - 对大型可观察数组进行排序

    我在带有可观察数组的页面上定义了一个淘汰模型 我想要有按钮来按不同属性对数组进行排序 我有一个 工作 解决方案 但对于大型数组来说它非常慢 jsFiddle http jsfiddle net 7JNrc http jsfiddle net
  • 从生成的 Protocol Buffer 类继承

    Protocol Buffer 文档警告 您永远不应该通过继承向生成的类添加行为 从他们 这会破坏内部机制并且不好 无论如何 面向对象的实践 source 协议缓冲区基础知识 https developers google com prot
  • Google Play 服务以编程方式设置应用 ID

    我知道你可以通过AndroidManifest设置com google android gms games APP ID 但是有没有办法通过某些生成器或类似的东西以编程方式设置它 None
  • 更改南迁目录

    如何更改 South 查找应用程序迁移的位置 默认情况下 South 假定应用程序的迁移位于 migrations 中 但是 我已经迁移了安装在 usr local lib python 2 6 dist packages 的第三方包的模型
  • VHDL - PhysDesignRules:367

    当我尝试从 VHDL 代码合成 实现和生成程序文件时 我收到警告 当我尝试合成时出现此错误 WARNING Xst 647 Input
  • 如何将 pyspark 数据框列拆分为两列(下面的示例)?

    该列在单行中多次使用分隔符 因此split并不那么简单 分裂后 只有第一个分隔符在这种情况下必须考虑发生的情况 截至目前 我正在这样做 不过 我觉得可以有更好的解决方案 testdf spark createDataFrame Dog me
  • 如何在android中处理搜索视图的后退按钮

    SearchView searchView SearchView MenuItemCompat getActionView menu findItem Menus SEARCH searchView setQueryHint this ge
  • 由信用卡资助的 Paypal 账户 = 10417 错误

    我来这里是为了尝试解决贝宝中没有人可以帮助我们的黑洞 我们有一个企业帐户 Paypal Express 数字商品 已验证 限制解除等 我们将贝宝快递集成罚款作为我们唯一的付款方式 但有一个问题 任何由信用卡资助的 Paypal 帐户都会完全
  • YUI - 获取真实的元素宽度?

    我正在使用 YUI 需要获取元素的真实宽度 元素的宽度可以如下确定 宽度 左边框 右边框 左内边距 右内边距 左外边距 右外边距 以下是我的想法 它似乎正在发挥作用 我只是想知道这是否是确定此问题的最佳方法 或者是否有更有效的方法 YUI
  • 使用 JSch sudo 示例和 Channel.setPty 在远程主机上运行 sudo 命令

    我在以下链接中使用了 JSch Sudo 示例 http www jcraft com jsch examples Sudo java html http www jcraft com jsch examples Sudo java htm
  • Qt Creator 自定义构建步骤无法复制文件

    我想将文件复制到 Qt Creator 自定义构建步骤中构建 dist 目录 But after I Build the error thrown Could not start process copy C Users W Desktop
  • python中高效统计词频

    我想计算文本文件中所有单词的频率 gt gt gt countInFile test txt 应该返回 aaa 1 bbb 2 ccc 1 如果目标文本文件如下 test txt aaa bbb ccc bbb 我已经用纯 python 实
  • 如何在JUnit 5的“@BeforeEach”方法中打印“要执行”的@Test方法的名称?

    我知道我们可以在 JUnit 4 中使用 Rule and TestName但是我正在使用 JUnit 5 Jupiter 并且正在努力寻找一种方法来打印测试方法 要执行的 名称 BeforeEach method 声明一个TestInfo
  • 寻找使用 git-format-patch 和 git am 的工作流程示例

    我正在考虑让我的学生使用 git 进行结对编程 由于学生的工作必须保密 因此不可能公开回购 相反 每个学生都会有一个他们自己维护的私人存储库 并且他们需要使用 git format patch 交换补丁 我已经阅读了手册页 但我有点不清楚w
  • 导入错误:没有名为 virtualenv 的模块

    我在 windows7 上使用 Django 1 3 7 和 python 2 7 6 当我在这行代码中执行manage py时出现错误 import shutil sys virtualenv subprocess amd 运行它 我收到
  • 我如何引用values.yaml中的命名空间?

    我希望能够引用当前的命名空间values yaml使用它来为某些值添加后缀 如下所示 in values yaml someParam someval Release Namespace 以这种方式定义它比进入我的所有模板并添加要好得多 R
  • Jquery mobile t.split 不是一个函数

    我正在开发一个网站 该网站从数据库动态获取内容并将其放入 HTML 代码中 该代码应由 JQuery Mobile 页面组成 这样我可以单击链接 它将滑动到相应的页面 我正在运行 JQuery 1 9 1 和 JQuery Mobile 1
  • 使用 SQSH 对名称中包含空格的表进行 SELECT 查询

    我在 Ubuntu 10 04 上使用 SQSH 版本 2 1 通过如下命令连接到 MSSQL 数据库 sqsh S server U user P password D database 我有一个名为 我的表 的表 但我找不到对其运行 S
  • 在 MVC 4.0 中实现动态操作的路由

    是否可以在 MVC 中定义一个路由 根据部分路由动态解析操作 public class PersonalController public ActionResult FriendGroup code public ActionResult
  • C++ 中具有双缓冲区的单生产者、单消费者数据结构

    我在 work 有一个应用程序 我必须在以不同频率调度的两个实时线程之间移动 实际的调度超出了我的控制范围 应用程序是硬实时的 其中一个线程必须驱动硬件接口 因此线程之间的数据传输应该是无锁和无等待的可能的程度 需要注意的是 只需要传输一个