无锁堆栈实现想法 - 目前已损坏

2023-12-23

我想出了一个想法,尝试实现一个无锁堆栈,该堆栈不依赖引用计数来解决 ABA 问题,并且还可以正确处理内存回收。它在概念上与 RCU 类似,并且依赖于两个功能:将列表条目标记为已删除,以及跟踪遍历列表的读者。前者很简单,它只使用指针的LSB。后者是我对实现无限无锁堆栈的方法的“聪明”尝试。

基本上,当任何线程尝试遍历列表时,一个原子计数器 (list.entries) 就会增加。当遍历完成时,第二个计数器(list.exits)会递增。

节点分配由push 处理,释放由pop 处理。

推入和弹出操作与简单的无锁堆栈实现非常相似,但是必须遍历标记为删除的节点才能到达未标记的条目。因此,推送基本上很像链表插入。

pop 操作类似地遍历列表,但它在遍历时使用atomic_fetch_or 将节点标记为已删除,直到到达未标记的节点。

在遍历完 0 个或多个标记节点的列表后,正在弹出的线程将尝试 CAS 栈头。至少有一个线程同时弹出将成功,此后所有进入堆栈的读者将不再看到以前标记的节点。

成功更新列表的线程然后加载原子list.entries,并且基本上旋转加载atomic.exits,直到该计数器最终超过list.entries。这应该意味着“旧”版本列表的所有读者都已完成。然后,线程简单地释放它从列表顶部交换的标记节点列表。

因此,pop 操作的含义应该是(我认为)不会出现 ABA 问题,因为被释放的节点不会返回到可用的指针池,直到所有使用它们的并发读取器完成为止,显然还有内存回收问题出于同样的原因,也进行了处理。

无论如何,这是理论,但我仍然对实现摸不着头脑,因为它目前不起作用(在多线程情况下)。似乎我在其他问题中遇到了一些免费写入后的问题,但我很难发现问题,或者也许我的假设有缺陷,它就是行不通。

任何有关概念和调试代码的方法的见解都将不胜感激。

这是我当前的(损坏的)代码(使用 gcc -D_GNU_SOURCE -std=c11 -Wall -O0 -g -pthread -o list list.c 编译):

#include <pthread.h>
#include <stdatomic.h>
#include <stdbool.h>
#include <stdint.h>
#include <stdlib.h>

#include <sys/resource.h>

#include <stdio.h>
#include <unistd.h>

#define NUM_THREADS 8
#define NUM_OPS (1024 * 1024)

typedef uint64_t list_data_t;

typedef struct list_node_t {
    struct list_node_t * _Atomic next;
    list_data_t data;
} list_node_t;

typedef struct {
    list_node_t * _Atomic head;
    int64_t _Atomic size;
    uint64_t _Atomic entries;
    uint64_t _Atomic exits;
} list_t;

enum {
    NODE_IDLE    = (0x0),
    NODE_REMOVED = (0x1 << 0),
    NODE_FREED   = (0x1 << 1),
    NODE_FLAGS    = (0x3),
};

static __thread struct {
    uint64_t add_count;
    uint64_t remove_count;
    uint64_t added;
    uint64_t removed;
    uint64_t mallocd;
    uint64_t freed;
} stats;

#define NODE_IS_SET(p, f) (((uintptr_t)p & f) == f)
#define NODE_SET_FLAG(p, f) ((void *)((uintptr_t)p | f))
#define NODE_CLR_FLAG(p, f) ((void *)((uintptr_t)p & ~f))
#define NODE_POINTER(p) ((void *)((uintptr_t)p & ~NODE_FLAGS))

list_node_t * list_node_new(list_data_t data)
{
    list_node_t * new = malloc(sizeof(*new));
    new->data = data;
    stats.mallocd++;

    return new;
}

void list_node_free(list_node_t * node)
{
    free(node);
    stats.freed++;
}

static void list_add(list_t * list, list_data_t data)
{
    atomic_fetch_add_explicit(&list->entries, 1, memory_order_seq_cst);

    list_node_t * new = list_node_new(data);
    list_node_t * _Atomic * next = &list->head;
    list_node_t * current = atomic_load_explicit(next,  memory_order_seq_cst);
    do
    {
        stats.add_count++;
        while ((NODE_POINTER(current) != NULL) &&
                NODE_IS_SET(current, NODE_REMOVED))
        {
                stats.add_count++;
                current = NODE_POINTER(current);
                next = &current->next;
                current = atomic_load_explicit(next, memory_order_seq_cst);
        }
        atomic_store_explicit(&new->next, current, memory_order_seq_cst);
    }
    while(!atomic_compare_exchange_weak_explicit(
            next, &current, new,
            memory_order_seq_cst, memory_order_seq_cst));

    atomic_fetch_add_explicit(&list->exits, 1, memory_order_seq_cst);
    atomic_fetch_add_explicit(&list->size, 1, memory_order_seq_cst);
    stats.added++;
}

static bool list_remove(list_t * list, list_data_t * pData)
{
    uint64_t entries = atomic_fetch_add_explicit(
            &list->entries, 1, memory_order_seq_cst);

    list_node_t * start = atomic_fetch_or_explicit(
            &list->head, NODE_REMOVED, memory_order_seq_cst);
    list_node_t * current = start;

    stats.remove_count++;
    while ((NODE_POINTER(current) != NULL) &&
            NODE_IS_SET(current, NODE_REMOVED))
    {
        stats.remove_count++;
        current = NODE_POINTER(current);
        current = atomic_fetch_or_explicit(&current->next,
                NODE_REMOVED, memory_order_seq_cst);
    }

    uint64_t exits = atomic_fetch_add_explicit(
            &list->exits, 1, memory_order_seq_cst) + 1;

    bool result = false;
    current = NODE_POINTER(current);
    if (current != NULL)
    {
        result = true;
        *pData = current->data;

        current = atomic_load_explicit(
                &current->next, memory_order_seq_cst);

        atomic_fetch_add_explicit(&list->size,
                -1, memory_order_seq_cst);

        stats.removed++;
    }

    start = NODE_SET_FLAG(start, NODE_REMOVED);
    if (atomic_compare_exchange_strong_explicit(
            &list->head, &start, current,
            memory_order_seq_cst, memory_order_seq_cst))
    {
        entries = atomic_load_explicit(&list->entries, memory_order_seq_cst);
        while ((int64_t)(entries - exits) > 0)
        {
            pthread_yield();
            exits = atomic_load_explicit(&list->exits, memory_order_seq_cst);
        }

        list_node_t * end = NODE_POINTER(current);
        list_node_t * current = NODE_POINTER(start);
        while (current != end)
        {
            list_node_t * tmp = current;
            current = atomic_load_explicit(&current->next, memory_order_seq_cst);
            list_node_free(tmp);
            current = NODE_POINTER(current);
        }
    }

    return result;
}

static list_t list;

pthread_mutex_t ioLock = PTHREAD_MUTEX_INITIALIZER;

void * thread_entry(void * arg)
{
    sleep(2);
    int id = *(int *)arg;

    for (int i = 0; i < NUM_OPS; i++)
    {
        bool insert = random() % 2;

        if (insert)
        {
            list_add(&list, i);
        }
        else
        {
            list_data_t data;
            list_remove(&list, &data);
        }
    }

    struct rusage u;
    getrusage(RUSAGE_THREAD, &u);

    pthread_mutex_lock(&ioLock);
    printf("Thread %d stats:\n", id);
    printf("\tadded = %lu\n", stats.added);
    printf("\tremoved = %lu\n", stats.removed);
    printf("\ttotal added = %ld\n", (int64_t)(stats.added - stats.removed));
    printf("\tadded count = %lu\n", stats.add_count);
    printf("\tremoved count = %lu\n", stats.remove_count);
    printf("\tadd average = %f\n", (float)stats.add_count / stats.added);
    printf("\tremove average = %f\n", (float)stats.remove_count / stats.removed);
    printf("\tmallocd = %lu\n", stats.mallocd);
    printf("\tfreed = %lu\n", stats.freed);
    printf("\ttotal mallocd = %ld\n", (int64_t)(stats.mallocd - stats.freed));
    printf("\tutime = %f\n", u.ru_utime.tv_sec
            + u.ru_utime.tv_usec / 1000000.0f);
    printf("\tstime = %f\n", u.ru_stime.tv_sec
                    + u.ru_stime.tv_usec / 1000000.0f);
    pthread_mutex_unlock(&ioLock);

    return NULL;
}

int main(int argc, char ** argv)
{
    struct {
            pthread_t thread;
            int id;
    }
    threads[NUM_THREADS];
    for (int i = 0; i < NUM_THREADS; i++)
    {
        threads[i].id = i;
        pthread_create(&threads[i].thread, NULL, thread_entry, &threads[i].id);
    }

    for (int i = 0; i < NUM_THREADS; i++)
    {
        pthread_join(threads[i].thread, NULL);
    }

    printf("Size = %ld\n", atomic_load(&list.size));

    uint32_t count = 0;

    list_data_t data;
    while(list_remove(&list, &data))
    {
        count++;
    }
    printf("Removed %u\n", count);
}

你提到你正在尝试解决 ABA 问题,但描述和代码实际上是试图解决一个更难的问题:内存回收 https://arxiv.org/abs/1712.06134问题。

这个问题通常出现在用没有垃圾回收的语言实现的无锁集合的“删除”功能中。核心问题是,从共享结构中删除节点的线程通常不知道何时可以安全地释放已删除的节点,因为其他读取可能仍然引用它。经常解决这个问题,作为一个副作用,also解决了 ABA 问题:具体来说,即使底层指针(和对象的状态)在此期间已被更改至少两次,最终还是原始指针,CAS 操作仍能成功value却呈现出完全不同的状态。

ABA 问题更容易,因为对于 ABA 问题有几种直接的解决方案,具体来说,这些解决方案不会导致“内存回收”问题的解决方案。从某种意义上说,可以检测位置修改的硬件(例如使用 LL/SC 或事务内存原语)可能根本不会出现问题,这也更容易。

也就是说,您正在寻找内存回收问题的解决方案,并且它还将避免 ABA 问题。

你的问题的核心是这样的说法:

成功更新列表的线程然后加载原子 list.entries,基本上旋转加载atomic.exits直到计数器 终于超过了list.entries。这应该意味着所有读者 列表的“旧”版本已经完成。线程然后简单地 释放它从顶部交换的标记节点列表 列表。

这个逻辑不成立。等待list.exits(你说原子退出但我认为这是一个错字,因为你只谈论list.exits其他地方)大于list.entries只告诉你现在已经有更多退出总数比有entries此时变异线程捕获了条目计数。然而,这些退出可能是由来来往往的新读者产生的:这根本不意味着所有老读者都读完了正如你所说!

这是一个简单的例子。首先是一个写作线程T1和一个阅读线程T2大约在同一时间访问该列表,所以list.entries是 2 并且list.exits为0。写入线程弹出一个节点,并保存当前值(2)list.entries并等待lists.exits大于 2。现在还有三个阅读线程,T3, T4, T5到达并快速阅读清单并离开。现在lists.exits是 3,并且满足您的条件并且T1释放节点。T2但它并没有去任何地方,并且因为它正在读取一个已释放的节点而爆炸!

你的基本想法是可行的,但你的两种反击方法绝对行不通。

这是一个经过充分研究的问题,因此您不必发明自己的算法(请参阅上面的链接),甚至不必编写自己的代码,因为诸如librcu https://liburcu.org/ and 并发工具包 https://github.com/concurrencykit/ck已经存在。

用于教育目的

If you wanted不过,为了使这项工作达到教育目的,一种方法是确保修改后进入的线程已开始使用一组不同的list.entry/exit柜台。一种方法是使用生成计数器,当编写者想要修改列表时,它会增加生成计数器,这会导致新的读者切换到另一组不同的列表。list.entry/exit柜台。

现在编剧只需要等待list.entry[old] == list.exists[old],这意味着所有的old读者已经离开。您也可以只使用每代一个计数器:您实际上并没有两个entry/exit计数器(尽管它可能有助于减少争用)。

当然,您知道管理每代单独计数器的列表有一个新问题......这看起来像构建无锁列表的原始问题!不过,这个问题要容易一些,因为您可能会对“飞行中”的代数设置一些合理的限制,然后将它们全部预先分配,或者您可能会实现一种更容易推理的有限类型的无锁列表因为添加和删除只发生在头部或尾部。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

无锁堆栈实现想法 - 目前已损坏 的相关文章

  • 为 DocumentDb 设置自定义 json 转换器

    我正在使用类型化 DocumentQuery 从 Azure DocumentDb 集合中读取文档 from f in client CreateDocumentQuery
  • 无法在 QGLWidget 中设置所需的 OpenGL 版本

    我正在尝试在 Qt 4 8 2 中使用 QGLWidget 我注意到 QGLWidget 创建的默认上下文不显示 OpenGL 3 1 以上的任何输出 Qt wiki 有一个教程 http qt project org wiki How t
  • 如何使用C从http下载文件?

    最近几天我试图弄清楚如何从 URL 下载文件 这是我对套接字的第一个挑战 我用它来了解协议 所以我想在没有 cURL 库的情况下只用 C 语言来完成它 我搜索了很多 现在我可以打印页面的源代码 但我认为这与文件不同 我不必只将接收到的数据从
  • 并行运行多个任务

    我有一个代理列表 每个代理都会访问不同的站点并从站点中提取所需的数据 目前它一次只做一个 但我希望同时运行 10 20 个任务 这样它就可以一次性从 20 个站点下载 而不是只下载一个 这是我目前正在做的事情 private async T
  • 每个元素的 asp.net Web 表单自定义错误消息

    我创建了一个 Web 应用程序 表单 以及后端 SQL 插入和查询 目前我正在显示所有用户错误消息 div style padding 1em div
  • 如何在 C# 中以编程方式将行添加到 DataGrid?

    正如标题所述 我正在尝试使用 C 以编程方式将行添加到 DataGrid 但我似乎无法使其工作 这是我到目前为止所拥有的 I have a DataGrid declared as dg in the XAML foreach string
  • 如何在 Linux 上重新实现(或包装)系统调用函数?

    假设我想完全接管 open 系统调用 也许要包装实际的系统调用并执行一些日志记录 一种方法是使用 LD PRELOAD http scaryreasoner wordpress com 2007 11 17 using ld preload
  • C# 结构默认值

    我有一个方法 它接受一个包含许多具有基本数据类型的字段的结构 我想传递大部分默认值 但需要进行一些调整 但我了解结构声明中的基本字段不能包含默认值声明 例如struct S int a 42 现在是这样的 OptionsStruct opt
  • 大量互斥体对性能的影响

    假设我有一个包含 1 000 000 个元素的数组 以及多个工作线程 每个线程都操作该数组中的数据 工作线程可能会使用新数据更新已填充的元素 但每个操作仅限于单个数组元素 并且独立于任何其他元素的值 使用单个互斥锁来保护整个数组显然会导致高
  • 从图像创建半透明光标

    是否可以从图像创建光标并使其半透明 我目前正在拍摄自定义图像并覆盖鼠标光标图像 如果我可以将其设为半透明 那就太好了 但不是必需的 销售人员喜欢闪亮的 目前正在做这样的事情 Image cursorImage customImage Get
  • 如何在 C 中链接目标文件?失败并显示“架构 x86_64 的未定义符号”

    因此 我尝试在我的文件 file2 c 中使用另一个 C file1 c 文件中定义的函数 为了做到这一点 我包含了 file1 file1 h 的标头 但是 每当我尝试使用 gcc 编译文件时 我都会收到以下错误 Undefined sy
  • X 轴和 Z 轴上的 Quaternion.Slerp,无 Y 轴

    I am trying to rotate the Player about X Y and Z axis The Y axis should not move from last angle Example if I rotate 45
  • 更改私有模块片段是否会导致模块重新编译?

    On 此页面有关 C 20 模块功能 https www modernescpp com index php c 20 modules private module fragment and header units 我发现了这样的说法 借
  • C# 可以为控制台应用程序部分类“程序”类吗?

    我想知道是否可以将为任何控制台应用程序创建的默认 程序 类更改为部分类 我想这样做是因为我想要更好的组织 而不是将所有方法都放在按区域分类的 1 个文件中 对我来说 将某些方法类别放在单独的文件中会更有意义 我对分部类的理解是 它是多个文件
  • MINIX内部碎片2

    我正在用 C 语言编写一些软件 它递归地列出给定目录中的所有文件 现在我需要计算出内部碎片 我花了很长时间研究这个问题 发现 ext2 上的内部碎片只发生在最后一个块中 我知道理论上你应该能够从索引节点号获得第一个和最后一个块地址 但我不知
  • fgets溢出后如何清除输入缓冲区?

    当输入字符串超出其预定义限制时 我遇到了 fgets 的小问题 以下面的例子为例 for index 0 index lt max index printf Enter the d string index 1 if fgets input
  • 让 Windows 尝试读取文件

    我正在对 Windows 文件系统进行某种封装 当用户请求打开文件时 Windows 调用我的驱动程序来提供数据 在正常操作中 驱动程序返回缓存的文件内容 但是 在某些情况下 实际文件没有缓存 我需要从网络下载它 问题是是否有可能让 Win
  • 尝试后终于没有被调用

    由于某种原因 在我的控制台应用程序中 我无法运行我的finally 块 我编写这段代码是为了测试finally块是如何工作的 所以它非常简单 static void Main int i 0 try int j 1 i Generate a
  • 使用通用存储库模式和流畅的 nHibernate

    我目前正在开发一个中型应用程序 它将访问不同站点上的 2 个或更多 SQL 数据库等 我正在考虑使用类似的东西 http mikehadlow blogspot com 2008 03 using irepository pattern w
  • 使用空的weak_ptr作为参数调用map::count安全吗?

    打电话安全吗map count http www cplusplus com reference map map count on an 未初始化因此为空weak ptr http en cppreference com w cpp mem

随机推荐

  • 生成器不是迭代器吗?

    我有一个生成器 一个产生东西的函数 但是当试图将它传递给gensim Word2Vec我收到以下错误 类型错误 您不能将生成器作为句子参数传递 尝试迭代器 生成器不是迭代器的一种吗 如果没有 我如何从中创建一个迭代器 查看库代码 它似乎只是
  • 范围为“class”的 Pytest 装置不适用于“setup_class”方法

    我目前正在使用pytest addoption运行我的 API 测试 因此测试应该针对用户在命令行上使用的环境运行 在我的测试文件中 我试图实例化UsersSupport只上一次课 就通过了env争论 我的代码 测试 py import p
  • NSMutableData 的 mutableBytes 和 bytes 方法之间的区别

    两者都返回相同的指针 我知道 bytes属于NSData 为什么NSMutableData介绍 mutableBytes 是否只是为了代码清晰 以便更明显地访问可变数据 使用哪一个真的很重要吗 NSMutableData mydata NS
  • 使用ExternalProject_Add将Opus包含在Android中

    这可能很简单 我有一个使用 NDK 的 Android 项目 我想包括opus https opus codec org downloads 本机代码中的源代码 我尝试使用 CMake 的ExternalProject Add 属性 但我的
  • ASP.Net WebForms 路由 多个目的地的单一路由

    我正在考虑为我计划创建的新网站设置数据库路由 我一直在查看以下有关使用数据库中的FriendlyUrls 的教程 http www asp net web forms tutorials aspnet 45 getting started
  • 在 C 扩展中释放全局 VM 锁而不使用其他函数

    我不明白为什么在 Ruby C API 中释放或获取 GVL 时需要另一个间接级别 Both rb thread call without gvl and rb thread call with gvl 需要一个只接受一个参数的函数 但情况
  • DataTable 内的 Markdown 正在其周围添加段落

    当 DashTable 中使用 markdown 时 需要额外增加一个 p 添加标签使单元格和实习生所有行变大 import dash from dash import dash table app dash Dash app layout
  • 在三星上,Compose AlertDialog 始终采用全宽

    在我的配备 One UI 5 0 和 Android 13 的三星 Galaxy S22 上 撰写 AlertDialog 始终占据全宽 在其他设备上它的工作方式与预期一致 Compose版本是1 3 1 您只需下载即可重现此内容 我怀疑这
  • 为什么我的 WPF 应用程序禁用了拖放功能(即使当 AllowDrop 为 true 时)?

    我的 WPF 应用程序禁止从 Windows 资源管理器中删除文件 并显示停止标志光标 我尝试在主窗口和包含的控件上将AllowDrop属性 UIElement祖先的属性 设置为true 但完全没有运气 没有触发拖放事件 有什么想法或建议来
  • 区分具有未知功能的产品 - sympy

    我尝试了各种搜索 但找不到一个好的谷歌字符串来得出正确的结果 我有以下形式的产品 y x f x 其中 f 是 x 的未知函数 我希望 sympy 对 y 相对于 x 进行微分 有谁知道我该怎么做 怎么样 gt gt gt x sympy
  • jQuery .trigger('click') 在间隔函数内?

    这是一个改写的问题here https stackoverflow com questions 5031019 stuck on weird jquery error 经过一些测试后 我隔离了问题 但没有解决它的线索 无需阅读上一个问题 这
  • 为什么线程过程应该是静态的或成员函数

    为什么线程过程应该是静态的或成员函数 有什么正当理由吗 非静态成员变量有一个隐式的this编译器内部传递的参数 You have ClassInQuestion void threadFunc int 并且编译器内部创建了一个函数 void
  • DataTable.Merge 和 DataTable.ImportRow 不会更改 RowState

    我在 ADO NET 2 0 合并 导入数据时遇到问题 我需要将数据从一个通用表更新 插入到另一个表 并且两个表都保持相同的架构 以下代码在本地运行良好 但不会对数据库进行更改 OleDbDataAdapter localDA loadLo
  • mailgun 传入邮件事件获取附件 url

    我有一个节点端点 它接收 json 格式的传入电子邮件 其中包含来自 mailgun 的所有附件 附件位于 json 数组中 xxx com 用于隐私 attachments url https sw api mailgun net v3
  • 导航架构组件 - 将参数数据传递到 startDestination

    我有一个活动 A 启动活动 B 并向其传递一些意图数据 活动 B 托管来自新导航架构组件的导航图 我想将该意图数据作为参数传递给 startDestination 片段 如何做到这一点 好的 感谢 Google 团队的 Ian Lake 我
  • 正确传输和保护 Web 应用程序的用户密码

    我正在为我的硕士项目开发一个网络应用程序 这是一个供教授管理学生项目的系统 它使用Java作为服务器端代码 使用HSQLDB作为数据库 使用JSP作为表示层 并且运行在tomcat上 将存储的数据不包括任何敏感信息 学生 ID 财务信息等
  • FixIO 是做什么的?

    The System IO 文档 https hackage haskell org package base 4 5 0 0 docs System IO html包含一个神秘的 未记录的函数fixIO 它的来源 https hackag
  • 显示 Maven dependency:tree 中省略的版本?

    在 Eclipse 中 当我转到 Maven 依赖关系层次结构页面时 我得到的输出指出了哪些冲突导致版本被忽略 但是 如果我使用依赖 树 http maven apache org plugins maven dependency plug
  • purescript 中列表/数组中的类似记录类型

    有什么办法可以做类似的事情 first x 0 second x 1 y 1 both first second 这样both被推断为 x Int r 或类似的东西 我尝试过一些事情 x 3 Array forall r x Int r n
  • 无锁堆栈实现想法 - 目前已损坏

    我想出了一个想法 尝试实现一个无锁堆栈 该堆栈不依赖引用计数来解决 ABA 问题 并且还可以正确处理内存回收 它在概念上与 RCU 类似 并且依赖于两个功能 将列表条目标记为已删除 以及跟踪遍历列表的读者 前者很简单 它只使用指针的LSB