提升消费者进程中的共享内存和同步队列问题/崩溃

2023-12-12

我正在尝试从子进程消耗 C++ 中的同步队列。我在 C++ 中使用这个同步队列 () (http://www.internetmosquito.com/2011/04/making-thread-safe-queue-in-c-i.html)

我修改了队列以在boost中可序列化,并且还替换了使用的boost::mutex io_mutex_使用进程间互斥锁(感谢@Sehe)boost::interprocess::interprocess_mutex io_mutex_并且当锁定时 我改变了每一行boost::mutex::scoped_lock lock(io_mutex_); to scoped_lock<interprocess_mutex> lock(io_mutex_);

template<class T>
class SynchronizedQueue
{
    friend class boost::serialization::access;
    template<class Archive>
    void serialize(Archive & ar, const unsigned int version)
    {
        ar & sQueue;
        ar & io_mutex_;
        ar & waitCondition;
    }
    ... // queue implementation (see [http://www.internetmosquito.com/2011/04/making-thread-safe-queue-in-c-i.html][2])

}

在我的测试应用程序中,我创建同步队列并在其中存储该类的 100 个实例:

class gps_position
{
    friend class boost::serialization::access;
    template<class Archive>
    void serialize(Archive & ar, const unsigned int version)
    {
        ar & degrees;
        ar & minutes;
        ar & seconds;
    }
public:
 int degrees;
 int minutes;
 float seconds;

 gps_position() {};
 gps_position(int d, int m, float s) :
 degrees(d), minutes(m), seconds(s)
 {}
};

Consumer 和 Producer 之间的通用定义:

 char *SHARED_MEMORY_NAME = "MySharedMemory";
 char *SHARED_QUEUE_NAME  =  "MyQueue";
 typedef SynchronizedQueue<gps_position> MySynchronisedQueue;

生产者进程代码:

    // Remove shared memory if it was created before
    shared_memory_object::remove(SHARED_MEMORY_NAME);
    // Create a new segment with given name and size
    managed_shared_memory mysegment(create_only,SHARED_MEMORY_NAME, 65536);
    MySynchronisedQueue *myQueue = mysegment.construct<MySynchronisedQueue>(SHARED_QUEUE_NAME)();
    //Insert data in the queue
    for(int i = 0; i < 100; ++i)  {
        gps_position position(i, 2, 3);
        myQueue->push(position);
    }
    // Start 1 process (for testing for now)
    STARTUPINFO info1={sizeof(info1)};
    PROCESS_INFORMATION processInfo1;
    ZeroMemory(&info1, sizeof(info1));
    info1.cb = sizeof info1 ; //Only compulsory field
    ZeroMemory(&processInfo1, sizeof(processInfo1));
    // Launch child process
    LPTSTR szCmdline = _tcsdup(TEXT("ClientTest.exe"));
    CreateProcess(NULL, szCmdline, NULL, NULL, TRUE, 0, NULL, NULL, &info1, &processInfo1);
    // Wait a little bit ( 5 seconds) for the started client process to load
    WaitForSingleObject(processInfo1.hProcess, 5000);

    /* THIS TESTING CODE WORK HERE AT PARENT PROCESS BUT NOT IN CLIENT PROCESS
    // Open the managed segment memory
    managed_shared_memory openedSegment(open_only, SHARED_MEMORY_NAME);
    //Find the synchronized queue using it's name
    MySynchronisedQueue *openedQueue = openedSegment.find<MySynchronisedQueue>(SHARED_QUEUE_NAME).first;
    gps_position position;
    while (true) {
        if (myQueue->pop(position)) {
            std::cout << "Degrees= " << position.degrees << " Minutes= " << position.minutes << " Seconds= " << position.seconds;
            std::cout << "\n";
        }
        else
            break;
    }*/


    // Wait until the queue is empty: has been processed by client(s)
    while(myQueue->sizeOfQueue() > 0) continue;

    // Close process and thread handles. 
    CloseHandle( processInfo1.hThread );

我的消费者代码如下:

    //Open the managed segment memory
    managed_shared_memory segment(open_only, SHARED_MEMORY_NAME);
    //Find the vector using it's name
    MySynchronisedQueue *myQueue = segment.find<MySynchronisedQueue>(SHARED_QUEUE_NAME).first;
    gps_position position;
    // Pop each position until the queue become empty and output its values
    while (true)
    {
        if (myQueue->pop(position)) { // CRASH HERE
            std::cout << "Degrees= " << position.degrees << " Minutes= " << position.minutes << " Seconds= " << position.seconds;
            std::cout << "\n";
        }
        else
            break;
    }

当我运行创建队列的父进程(生产者)并创建子(消费者)进程时,子进程在尝试从队列中“弹出”时崩溃。

我在这里做错了什么?任何想法 ?感谢您的任何见解。这是我使用 boost 和共享内存创建的第一个应用程序。

我的目标是能够从多个进程消耗这个队列。在上面的示例中,我仅创建一个子进程,以确保它在创建其他子进程之前首先正常工作。这个想法是队列将提前被项目填充,并且多个创建的进程将从队列中“弹出”项目而不会相互冲突。


到更新后的代码:

  • 如果你要共享队列,你应该使用 interprocess_mutex ;这意味着一系列相关的变化。
  • 如果您要共享队列,您的队列应该使用共享内存分配器
  • 应在互斥锁下提出条件,以便在所有平台上实现可靠的行为
  • 你没能锁进去toString()。即使您复制了集合,但这还不够,因为容器可能会在复制过程中被修改。
  • 队列设计很有意义(返回的“线程安全”函数有什么用?empty()?在处理返回值之前它可能不再为空/只是为空...这些被称为竞争条件并导致很难追踪错误
  • Boost Serialization 与什么有关系?看起来只是为了混淆图片,因为它不是必需的并且未被使用.
  • 对于 Boost Any 也是如此。为什么是any用于toString()?由于队列的设计,typeid 始终是gpsposition无论如何。
  • 同样对于boost::lexical_cast<>(如果你已经有了字符串流,为什么还要进行字符串连接?)
  • Why are empty(), toString(), sizeOfQueue() not const?

我强烈推荐使用boost::interprocess::message_queue. This seems成为您真正想要使用的。

这是一个修改版本,将容器放入共享内存中并且它可以工作:

#include <boost/interprocess/allocators/allocator.hpp>
#include <boost/interprocess/containers/deque.hpp>
#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/interprocess/sync/interprocess_condition.hpp>
#include <boost/interprocess/sync/interprocess_mutex.hpp>
#include <boost/thread/lock_guard.hpp>
#include <sstream>

namespace bip = boost::interprocess;

template <class T> class SynchronizedQueue {

  public:
    typedef bip::allocator<T, bip::managed_shared_memory::segment_manager> allocator_type;
  private:
    bip::deque<T, allocator_type> sQueue;
    mutable bip::interprocess_mutex io_mutex_;
    mutable bip::interprocess_condition waitCondition;
  public:
    SynchronizedQueue(allocator_type alloc) : sQueue(alloc) {} 

    void push(T element) {
        boost::lock_guard<bip::interprocess_mutex> lock(io_mutex_);
        sQueue.push_back(element);
        waitCondition.notify_one();
    }
    bool empty() const {
        boost::lock_guard<bip::interprocess_mutex> lock(io_mutex_);
        return sQueue.empty();
    }
    bool pop(T &element) {
        boost::lock_guard<bip::interprocess_mutex> lock(io_mutex_);

        if (sQueue.empty()) {
            return false;
        }

        element = sQueue.front();
        sQueue.pop_front();

        return true;
    }
    unsigned int sizeOfQueue() const {
        // try to lock the mutex
        boost::lock_guard<bip::interprocess_mutex> lock(io_mutex_);
        return sQueue.size();
    }
    void waitAndPop(T &element) {
        boost::lock_guard<bip::interprocess_mutex> lock(io_mutex_);

        while (sQueue.empty()) {
            waitCondition.wait(lock);
        }

        element = sQueue.front();
        sQueue.pop();
    }

    std::string toString() const {
        bip::deque<T> copy;
        // make a copy of the class queue, to reduce time locked
        {
            boost::lock_guard<bip::interprocess_mutex> lock(io_mutex_);
            copy.insert(copy.end(), sQueue.begin(), sQueue.end());
        }

        if (copy.empty()) {
            return "Queue is empty";
        } else {
            std::stringstream os;
            int counter = 0;

            os << "Elements in the Synchronized queue are as follows:" << std::endl;
            os << "**************************************************" << std::endl;

            while (!copy.empty()) {
                T object = copy.front();
                copy.pop_front();
                os << "Element at position " << counter << " is: [" << typeid(object).name()  << "]\n";
            }
            return os.str();
        }
    }
};

struct gps_position {
    int degrees;
    int minutes;
    float seconds;

    gps_position(int d=0, int m=0, float s=0) : degrees(d), minutes(m), seconds(s) {}
};

static char const *SHARED_MEMORY_NAME = "MySharedMemory";
static char const *SHARED_QUEUE_NAME  =  "MyQueue";
typedef SynchronizedQueue<gps_position> MySynchronisedQueue;

#include <boost/interprocess/shared_memory_object.hpp>
#include <iostream>

void consumer()
{
    bip::managed_shared_memory openedSegment(bip::open_only, SHARED_MEMORY_NAME);
    
    MySynchronisedQueue *openedQueue = openedSegment.find<MySynchronisedQueue>(SHARED_QUEUE_NAME).first;
    gps_position position;

    while (openedQueue->pop(position)) {
        std::cout << "Degrees= " << position.degrees << " Minutes= " << position.minutes << " Seconds= " << position.seconds;
        std::cout << "\n";
    }
}

void producer() {
    bip::shared_memory_object::remove(SHARED_MEMORY_NAME);
    
    bip::managed_shared_memory mysegment(bip::create_only,SHARED_MEMORY_NAME, 65536);

    MySynchronisedQueue::allocator_type alloc(mysegment.get_segment_manager());
    MySynchronisedQueue *myQueue = mysegment.construct<MySynchronisedQueue>(SHARED_QUEUE_NAME)(alloc);

    for(int i = 0; i < 100; ++i)          
        myQueue->push(gps_position(i, 2, 3));

    // Wait until the queue is empty: has been processed by client(s)
    while(myQueue->sizeOfQueue() > 0) 
        continue;
}

int main() {
    producer();
    // or enable the consumer code for client:
    // consumer();
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

提升消费者进程中的共享内存和同步队列问题/崩溃 的相关文章

随机推荐

  • 启动多个线程并重新启动它们

    我正在尝试编写一个系统 在其中创建 x 个工作线程 这些线程将在不同的时间完成它们的工作 当它们中的任何一个完成工作时 我将检查它们的输出并再次重新启动它们 将运行的线程数保持在 x 左右 我将进行多次任意迭代 因此 基本上控制器线程将启动
  • 如何在spark结构化流连接中选择最新记录

    我使用的是spark sql 2 4 x版本 datastax spark cassandra connector用于Cassandra 3 x版本 和卡夫卡一起 我有货币样本的汇率元数据如下 val ratesMetaDataDf Seq
  • php 中的字符串到压缩流

    我有一个带有数据库的处理服务器和一个服务数据库 以较低的带宽成本提供文件 在处理服务器上 php 无法创建文件 因此一切都必须通过流完成和 或保留在内存中 然后才能发送到另一台服务器进行下载 几天前 我发现了 php memory 的流抽象
  • 如何在 ggplotly() 中使分组热图的列宽全部相同

    我有这个数据框 gene symbol lt c DADA SDAASD SADDSD SDADD ASDAD XCVXCVX EQWESDA DASDADS SDASDASD DADADASD sdaadfd DFSD SADADDAD
  • 为什么 SQLSTATE[HY000]: 一般错误?

    这是一个用于注册组长及其伙伴的代码
  • 用于年龄验证的正则表达式,仅使用 Javascript 接受 0-200 之间的年龄

    我想要一个允许用户仅输入 0 到 200 之间的数字的正则表达式 我已经尝试过这个但它不起作用 var age regex S 0 9 0 3 您可以比较数值本身 而不是正则表达式 var ageValue 50 get the input
  • Matplotlib:外部图例,分布在多个子图中

    我有一个包含 2 个子图的图 所有子图共享相应的图表 即相同颜色的相同标签 我想在图的顶部有一个图例 延伸到两个子图 类似于下面的代码 import numpy as np import matplotlib pyplot as plt x
  • 请求标头中的 JWT 与接收 .Net Core API 时不一样

    当我从 Angular 应用程序向 Net Core 2 API 发出请求时 JWT 与请求标头中发送的 JWT 不同 启动 cs public class Startup public Startup IHostingEnvironmen
  • 从字符串中分割特殊字符和字母

    我有一个字符串值 我包含字母 特殊字符 数字和空格的组合 但我只想检索数字 my code Dim str1 As String 123456habAB Dim str2 As String Regex Replace str1 gt
  • 双截断输出的 7 个字符

    double fat 0 2654654645486684646846865584656566554566556564654654899866223625564668186456564564664564 cout lt
  • 无法删除被授予连接数据库的角色

    我正在使用 PostgreSQL 10 4 我发现了一个奇怪的行为 如果我们创建一个角色并将其授予CONNECT数据库 CREATE ROLE dummy GRANT CONNECT ON DATABASE test TO dummy 那么
  • 引用方法内的对象时出现问题

    internal class Program public class Creature public int health public int damage public int coins public static void Hit
  • 如何将现有的 iPhone 应用程序移植到 iPad

    我有一个 iPhone 应用程序 现在我想将该应用程序转换为可在所有 iPhone iPod iPad 设备上运行的通用应用程序 那么 从哪里开始 我需要做哪些事情呢 任何帮助 链接 示例应用程序 任何东西 都将受到高度赞赏 提前致谢 我最
  • iPhone OS 应用程序的可用内存

    是否有函数或常量定义 iPhone OS 中应用程序的可用内存量 我正在寻找一种独立于设备 iPod touch iPhone iPad 的方式来了解应用程序还剩多少内存 该函数将返回可用内存 以字节为单位 import
  • 如何定义二维数组?

    我想定义一个没有初始化长度的二维数组 如下所示 Matrix 但这给出了一个错误 IndexError 列表索引超出范围 从技术上讲 您正在尝试为未初始化的数组建立索引 在添加项目之前 您必须首先使用列表初始化外部列表 Python 称之为
  • 符号函数矩阵

    我想在 Matlab 中定义一个符号函数 而不是变量 矩阵 在工作区中 我希望它成为大小为 N M 的类 symfun 的元素 其中N and M是正整数 你不能创建一个矩阵symfun类元素 可能出于同样的原因无法创建函数句柄矩阵 但您可
  • 如何在 PHP 中发送 HTTP 请求并检索响应(通过标头微调)?

    我必须向 URL 发送 HTTP 请求并检索响应和标头 我不仅对页面内容感兴趣 而且对所有标题也感兴趣 最佳解决方案是什么 插座 PEAR 库不可访问 PHP 配置不可编辑 你应该使用curl 文档中的快速示例
  • 如何在与主程序不同的线程中编写套接字服务器(使用 gevent)?

    我正在开发一个 Flask gevent WSGIserver Web 服务器 它需要使用 XML 通过两个套接字与硬件设备进行通信 在后台 一个套接字由客户端 我的应用程序 启动 我可以向设备发送 XML 命令 设备在不同的端口上应答并发
  • Objective-C / C 从 SecKeyRef 中提取私钥(模数)

    我需要一种干净的方法来提取我的服务器公钥并将其与本地数据进行比较 以防止将来密钥过期 更新 但我似乎无法获取 256 位密钥或将其表示为有用的数据为了比较 这是我到目前为止所拥有的 BOOL trustCertFromChallenge N
  • 提升消费者进程中的共享内存和同步队列问题/崩溃

    我正在尝试从子进程消耗 C 中的同步队列 我在 C 中使用这个同步队列 http www internetmosquito com 2011 04 making thread safe queue in c i html 我修改了队列以在b