向无锁队列添加阻塞函数

2024-06-18

我有一个基于循环缓冲区的无锁多生产者、单消费者队列。到目前为止,它只有非阻塞push_back() and pop_front()来电。现在我想添加这些调用的阻塞版本,但我想尽量减少这对使用非阻塞版本的代码性能的影响 - 也就是说,它不应该将它们变成“默认锁定" calls.

例如。阻塞push_back()的最简单版本如下所示:

void push_back_Blocking(const T& pkg) {
    if (!push_back(pkg)) {
        unique_lock<mutex> ul(mux);
        while (!push_back(pkg)) {
            cv_notFull.wait(ul);
        }
    }
}

但不幸的是,这还需要将以下块放在“非阻塞”的末尾pop_front():

{
    std::lock_guard<mutex> lg(mux);
    cv_notFull.notify_all();
}

虽然notify单独使用几乎不会对性能产生任何影响(如果没有线程在等待),但锁却有。

所以我的问题是:
我如何(如果可能的话使用标准 c++14)添加阻塞push_back and pop_front我的队列的成员函数不会严重影响非阻塞对应项的性能(阅读:最小化系统调用)?至少只要没有线程实际被阻塞——但理想情况下也是如此。


作为参考,我当前的版本看起来与此类似(我省略了调试检查、数据对齐和显式内存排序):

template<class T, size_t N>
class MPSC_queue {
    using INDEX_TYPE = unsigned long;
    struct Idx {
        INDEX_TYPE idx;
        INDEX_TYPE version_cnt;
    };
    enum class SlotState {
        EMPTY,
        FILLED
    };
    struct Slot {
        Slot() = default;               
        std::atomic<SlotState> state= SlotState::EMPTY;
        T data{};
    };
    struct Buffer_t {
        std::array<Slot, N> data{}; 
        Buffer_t() {
            data.fill(Slot{ SlotState::EMPTY, T{} });
        }
        Slot& operator[](Idx idx) {
            return this->operator[](idx.idx);
        }
        Slot& operator[](INDEX_TYPE idx) {
            return data[idx];                   
        }
    };

    Buffer_t buffer;
    std::atomic<Idx> head{};
    std::atomic<INDEX_TYPE> tail=0;

    INDEX_TYPE next(INDEX_TYPE old) { return (old + 1) % N; }

    Idx next(Idx old) {
        old.idx = next(old.idx);
        old.version_cnt++;
        return old;
    }
public:     
    bool push_back(const T& val) {
        auto tHead = head.load();
        Idx wrtIdx;
        do {
            wrtIdx = next(tHead);
            if (wrtIdx.idx == tail) {
                return false;
            }
        } while (!head.compare_exchange_strong(tHead, wrtIdx));

        buffer[wrtIdx].data = val;
        buffer[wrtIdx].state = SlotState::FILLED;
        return true;
    }

    bool pop_front(T& val) {                
        auto rIdx = next(tail);
        if (buffer[rIdx].state != SlotState::FILLED) {
            return false;
        }
        val = buffer[rIdx].data;
        buffer[rIdx].state = SlotState::EMPTY;
        tail = rIdx;
        return true;
    }
};

相关问题:

我专门问了一个类似的问题,关于优化使用condition_variable::notify here https://stackoverflow.com/questions/32577466/use-condition-variable-notify-without-loacking-mutex?noredirect=1#comment53010530_32577466,但是这个问题被关闭了,因为据说是重复的这个问题 https://stackoverflow.com/questions/2763714/why-do-pthreads-condition-variable-functions-require-a-mutex%99%20condition%20variable%20functions%20require%20a%20mutex?.
我不同意,因为这个问题是关于为什么一般条件变量需要互斥锁(或者更确切地说它是 pthread 等效项) - 重点是condition_variable::wait- 而不是是否/如何避免notify部分。但显然我没有说得足够清楚(或者人们只是不同意我的观点)。

无论如何,链接问题中的答案对我没有帮助,因为这有点像XY-问题 https://meta.stackexchange.com/questions/66377/what-is-the-xy-problem无论如何,我决定就我遇到的实际问题提出另一个问题,从而允许更广泛的可能解决方案(也许有一种方法可以完全避免条件变量)。

这个问题 https://stackoverflow.com/questions/6089917/how-to-achieve-lock-free-but-blocking-behavior也很相似,但是

  1. 这是关于 Linux 上的 C 的内容,答案使用特定于平台的内容 构造(pthreads 和 futexes)
  2. 那里的作者要求高效的阻塞调用,但根本没有非阻塞调用。另一方面,我不太关心阻塞效率,但希望保持非阻塞尽可能快。

如果有潜在的条件变量的服务员,你have锁定互斥体notify_all call.

事情是这样的状况检查 (!push_back(pkg))执行before等着条件变量(C++11 没有提供其他方法)。所以互斥是唯一可以保证这些动作之间一致性的手段。

但是,如果不涉及潜在的服务员,则可以省略锁定(和通知)。只需使用附加标志:

class MPSC_queue {
    ... // Original definitions
    std::atomic<bool> has_waiters;

public:
    void push_back_Blocking(const T& pkg) {
        if (!push_back(pkg)) {
            unique_lock<mutex> ul(mux);
            has_waiters.store(true, std::memory_order_relaxed); // #1
            while (!push_back(pkg)) { // #2 inside push_back() method
                cv_notFull.wait(ul);
                // Other waiter may clean flag while we wait. Set it again. Same as #1.
                has_waiters.store(true, std::memory_order_relaxed);
            }
            has_waiters.store(false, std::memory_order_relaxed);
        }
    }

    // Method is same as original, exposed only for #2 mark.
    bool push_back(const T& val) {
        auto tHead = head.load();
        Idx wrtIdx;
        do {
            wrtIdx = next(tHead);
            if (wrtIdx.idx == tail) { // #2
                return false;
            }
        } while (!head.compare_exchange_strong(tHead, wrtIdx));

        buffer[wrtIdx].data = val;
        buffer[wrtIdx].state = SlotState::FILLED;
        return true;
    }

    bool pop_front(T& val) {
        // Main work, same as original pop_front, exposed only for #3 mark.
        auto rIdx = next(tail);
        if (buffer[rIdx].state != SlotState::FILLED) {
            return false;
        }
        val = buffer[rIdx].data;
        buffer[rIdx].state = SlotState::EMPTY;
        tail = rIdx; // #3

        // Notification part
        if(has_waiters.load(std::memory_order_relaxed)) // #4
        {
            // There are potential waiters. Need to lock.
            std::lock_guard<mutex> lg(mux);
            cv_notFull.notify_all();
        }

        return true;
    }
};

这里的关键关系是:

  1. 设置标志于#1和阅读tail用于检查条件#2.
  2. Storing tail at #3并检查标志#4.

这两种关系都应该暴露出某种普遍秩序。那是#1之前应观察#2即使通过其他线程。同样适用于#3 and #4.

在这种情况下,我们可以保证,如果检查标志#4发现它没有设置,然后可能进一步检查条件#2会发现条件变化的影响#3。所以不加锁(和通知)是安全的,因为不可能有服务员。

在您当前的实施中普遍秩序之间#1 and #2通过加载提供tail与隐含的内存顺序seq_cst。之间的顺序相同#3 and #4通过存储提供tail与隐含的内存顺序seq_cst.

在这种方法中,“如果没有服务员就不要锁”,普遍秩序是最棘手的部分。在这两种关系中,先读后写的顺序,这是无法通过任何组合来实现的内存顺序获取 and 内存顺序释放. So 内存顺序seq_cst应该使用。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

向无锁队列添加阻塞函数 的相关文章

随机推荐

  • MySQL:计算日期/时间之间的差异 - 仅在周一至周五“工作周”期间

    我需要计算开始日期 时间和结束日期 时间之间的差异 但是 我只想在 5 天的工作周内执行此操作 不包括周六 周日 做这个的最好方式是什么 我的想法是 从日期开始 我必须获取星期几 如果是工作日 那么我将添加到累加器中 如果不是 那么我不会添
  • Three.js 中的自定义纹理着色器

    我只是想创建一个非常简单的片段着色器 将指定的纹理绘制到网格上 我研究了一些实现相同功能的自定义片段着色器 并构建了我自己的着色器并围绕它支持 JS 代码 然而 它就是行不通 这是我尝试运行的代码的工作抽象 顶点着色器
  • PHP Laravel:未找到特征

    我在命名空间和使用方面遇到一些问题 我收到此错误 未找到特征 Billing BillingInterface 这些是我的 Laravel 应用程序中的文件 计费 php namespace Billing BillingInterface
  • C#:在特定时间启动应用程序

    我想在计算机上的特定时间启动应用程序 如何在 C 中执行此操作 只是我正在开发一个应用程序 当应用程序必须运行并执行特定任务时 用户将设置一个计时器 我听说在 Windows 中有一个工具可用于在特定时间启动应用程序 是的 Windows
  • NumPy 和 memmap:[Errno 24] 打开文件太多

    我正在处理大型矩阵 因此我使用 NumPy memmap 但是 我收到错误 因为显然 memmap 使用的文件描述符没有关闭 import numpy import tempfile counter 0 while True temp fd
  • 修改附加套件中 Firefox 下载对话框的内容

    我希望能够在开始文件下载时在 Firefox 中弹出的下载对话框中添加一个选项 是否可以使用新的附加 SDK 来执行此操作 还是必须使用旧方法 编辑 显然 如果选择了新选项 我需要一种方法来了解它并基于它执行代码 这就是你会用的东西XUL
  • 使用 Django 从标准输出返回 pdf 响应

    我正在使用 wkhtmltopdf 创建 PDF 文件 但是我不知道如何正确返回它们 所以我必须将它们写入我的媒体文件夹 然后重定向到刚刚创建的文件 编辑 Ian 的建议是写入 STDOUT 因此我更改了 wkhtmltopdf 命令来执行
  • 从 csv 文件中读取奇数行

    这看起来相当简单 我只需要从 R 中的数据文件中读取奇数行并创建一个新的数据框 我怎样才能实现这个目标 read csv filename csv c TRUE FALSE 怎么运行的 功能read csv用于读取整个文件并返回包含所有行的
  • 对数据进行分布拟合 - MATLAB

    我正在尝试对从显微镜图像中收集的一些数据进行分布 我们知道 152 左右的峰值是由于泊松过程造成的 我想将分布拟合到图像中心的大密度 同时忽略高强度数据 我知道如何将正态分布拟合到数据 红色曲线 但它不能很好地捕获右侧的重尾 尽管泊松分布应
  • 如何在 PHP 中获取 html 中的文件路径?

    有人可以告诉我如何使用 html 获取文件路径
  • Google App Engine 实例不断快速关闭

    所以我已经使用应用程序引擎很长一段时间了 没有任何问题 我知道 如果应用程序有一段时间没有被访问者点击 那么实例将关闭 并且第一个访问该网站的访问者将有几秒钟的延迟 同时新实例启动 然而 最近这些实例似乎只在很短的时间内保持活动状态 有时不
  • Webpack 4、postcss-loader 和 autoprefixer 插件

    我在尝试让自动前缀器工作时感到非常沮丧 这是我的webpack config js const HtmlWebPackPlugin require html webpack plugin const MiniCssExtractPlugin
  • 为什么 Dijkstra 算法使用减密钥?

    Dijkstra 教给我的算法如下 while pqueue is not empty distance node pqueue delete min if node has been visited continue else mark
  • 阻止UAC虚拟化?

    我有一个 VB6 应用程序 我已经销售了 12 年多了 有时 我的用户很难让应用程序运行 数据写入将进入 My Documents 因此除了安装文件 EXE 等 之外什么都不会进入 C Program Files 或 C Program F
  • 为什么 Tomcat 缺少内置的速率限制过滤器?

    从几个来源 1 https serverfault com questions 177742 2 http tomcat 10 x6 nabble com tomcat bandwidth limiter transfer rate lim
  • 将 ActionBarSherlock 与新的 SupportMapFragment 结合使用

    我正在考虑使用 ActionbarSherlock 但有一个查询阻碍了我 所以我的应用程序需要完全向后兼容 API Level 7 我需要在我的应用程序中实现新的 Google 地图 为此我需要使用 SupportMapFragment 类
  • 如何在 Flask 中调用不同蓝图的方法?

    我有一个具有多个蓝图模块的应用程序 我想调用一个方法 路径 该方法通常会从不同蓝图的路径中返回视图或渲染模板 如何才能正确完成此操作 谢谢 视图只是函数 导入该函数并直接调用它 传入它可能定义的任何路由参数 蓝图的作用是更轻松地在公共前缀下
  • 为什么 Github API 只返回前 100 个监视的存储库?

    我正在 Github 上查看 392 个存储库 然而 Github API 只返回 100 有人知道为什么吗 https github com api v2 json repos watched trivektor https github
  • 将 `new Date ()` 存储在 JSON 对象中

    我有以下字段验证器对象 type date min new Date 我希望我可以存储new Date 作为 JSON 中的表达式 解析时会执行 保存时间戳 type date min new Date getTime 然后你再读一遍 va
  • 向无锁队列添加阻塞函数

    我有一个基于循环缓冲区的无锁多生产者 单消费者队列 到目前为止 它只有非阻塞push back and pop front 来电 现在我想添加这些调用的阻塞版本 但我想尽量减少这对使用非阻塞版本的代码性能的影响 也就是说 它不应该将它们变成