如何使 Zeromq PUB/SUB 删除旧消息而不是新消息(用于实时提要)?

2024-02-24

说我有一个PUB服务器zmq_send()的实时消息SUB客户。如果客户很忙而无法zmq_recv()消息足够快,那么消息将在客户端(和/或服务器)中缓冲。

如果缓冲区变得太大(高水位线),则新消息将被丢弃。对于实时消息来说,这与人们想要的相反。应删除旧消息,为新消息腾出位置。

有什么方法可以做到这一点吗?

理想情况下我想要SUB客户端的接收队列为空或仅包含最新消息。当收到新消息时,它将替换旧消息。 (我猜这里的问题是客户端会阻止zmq_recv()当队列为空时,这样做会浪费时间。 )

那么实时提要通常是如何实现的ZeroMQ?


我会在这里回答我自己的问题。设置 ZMQ_CONFLATE“仅保留最后一条消息”似乎很有希望,但它不适用于订阅过滤器。它只在队列中保留一条消息。如果您有多个过滤器,则其他过滤器类型的旧消息和新消息都会被丢弃。

同样,zeromq 指南建议简单地杀死慢速订阅者,但这似乎不是现实的解决方案。让具有不同阅读速度的订阅者订阅同一个快速发布者,应该是正常的用例。其中一些订阅者可能生活在慢速计算机上,其他人可能生活在快速计算机上,等等。ZeroMQ 应该能够以某种方式处理这个问题。

http://zguide.zeromq.org/page:all#Slow-Subscriber-Detection-Suicidal-Snail-Pattern http://zguide.zeromq.org/page:all#Slow-Subscriber-Detection-Suicidal-Snail-Pattern

我最终在客户端手动删除旧的排队消息。看起来效果很好。我通过这种方式向客户端订阅了不到 3 毫秒的消息(通过 tcp localhost)。即使在队列中有 5000 条 10 秒的旧消息,位于后面那几条实时消息前面的情况下,这种方法也有效。这对我来说已经足够好了。

我不禁认为这是图书馆应该提供的东西。它可能可以做得更好。

无论如何,这是客户端,旧消息丢失,代码:

bool Empty(zmq::socket_t& socket) {
    bool ret = true;
    zmq::pollitem_t poll_item = { socket, 0, ZMQ_POLLIN, 0 };
    zmq::poll(&poll_item, 1, 0); //0 = no wait
    if (poll_item.revents & ZMQ_POLLIN) {
        ret = false;
    }
    return ret;
}

std::vector<std::string> GetRealtimeSubscribedMessageVec(zmq::socket_t& socket_sub, int timeout_ms)
{
    std::vector<std::string> ret;

    struct MessageTmp {
        int id_ = 0;
        std::string data_;
        boost::posix_time::ptime timestamp_;
    };

    std::map<int, MessageTmp> msg_map;

    int read_msg_count = 0;
    int time_in_loop = 0;
    auto start_of_loop = boost::posix_time::microsec_clock::universal_time();
    do {
        read_msg_count++;

        //msg format sent by publisher is: filter, timestamp, data
        MessageTmp msg;
        msg.id_ = boost::lexical_cast<int>(s_recv(socket_sub));
        msg.timestamp_ = boost::posix_time::time_from_string(s_recv(socket_sub));
        msg.data_ = s_recv(socket_sub);

        msg_map[msg.id_] = msg;

        auto now = boost::posix_time::microsec_clock::universal_time();
        time_in_loop = (now - start_of_loop).total_milliseconds();
        if (time_in_loop > timeout_ms) {
            std::cerr << "Timeout reached. Publisher is probably sending messages quicker than we can drop them." << std::endl;
            break;
        }
    } while ((Empty(socket_sub) == false)); 

    if (read_msg_count > 1) {
        std::cout << "num of old queued up messages dropped: " << (read_msg_count - 1) << std::endl;
    }

    for (const auto &pair: msg_map) {
        const auto& msg_tmp = pair.second;

        auto now = boost::posix_time::microsec_clock::universal_time();
        auto message_age_ms = (now - msg_tmp.timestamp_).total_milliseconds();

        if (message_age_ms > timeout_ms) {
            std::cerr << "[SUB] Newest message too old. f:" << msg_tmp.id_ << ", age: " << message_age_ms << "ms, s:" << msg_tmp.data_.size() << std::endl;
        }
        else {
            std::cout << "[SUB] f:" << msg_tmp.id_ << ", age: " << message_age_ms << "ms, s:" << msg_tmp.data_.size() << std::endl;
            ret.push_back(msg_tmp.data_);
        }
    }

    return ret;
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何使 Zeromq PUB/SUB 删除旧消息而不是新消息(用于实时提要)? 的相关文章

  • ZeroMQ:我希望发布-订阅删除旧消息以支持新消息

    我正在使用 ZeroMQ 发布 订阅套接字来连接两个进程 发布过程是一个传感器 刷新率比订阅过程快得多 我希望订阅过程仅使用队列中最新的消息 并完全忽略较旧的消息 我尝试为订阅者设置高水位线 但似乎有所下降newer消息而不是旧消息 是否有
  • 检测 ZeroMQ 队列中丢弃的消息

    由于似乎无法查询 检查底层 ZeroMQ 队列 缓冲区套接字以查看它们的利用率 因此是否有某种方法可以检测在发送 排队时由于发布者套接字中的缓冲区已满而导致消息被丢弃的情况 例如 如果发布者队列已满 则zmq send操作只会删除该消息 基
  • ZeroMQ - 多个发布者和监听器

    我刚刚开始了解并尝试 ZeroMQ 我不清楚如何在两个以上的参与者 发布者和订阅者 之间进行双向通信 以便每个组件都能够在 MQ 上读取和写入 这将允许创建事件驱动的架构 因为每个组件都可以侦听一个事件并回复另一个事件 有没有办法直接使用
  • ZMQ PUB 发送文件

    我第一次尝试 PY ZMQ 想知道是否可以使用 PUB SUB 发送完整的文件 二进制 我需要向许多订阅者发送数据库更新 我看到了短信的示例 但没有看到文件 是否可以 出版商 您应该能够使用 zmq 和 PUB SUB 模式将文件分发给多个
  • NanoMsg (NNG) 和 FlatBuffers 是否适合该项目?

    大声喊出我们是否应该考虑更好的事情 我正在寻找一种非常快速且简单的方法来获取多个程序 例如 5 个 每个程序都在私有 OpenStack 云上的单独节点上运行以相互通信 数据包将是短 C 结构 小于 100 字节 交通流量将会较少 可能低于
  • 每个线程或每个调用一个 ZeroMQ 套接字?

    众所周知 ZeroMQ套接字不得共享应用程序线程之间 context t但实例可以 我有一个多线程应用程序 我想让每个线程不时与一个线程交换消息REQ REP socket 对方 事件 异常等 取决于他们正在做什么 他们正在做非 ZeroM
  • 在 Xamarin 中使用 ZeroMQ

    我有一个由服务器和客户端组成的应用程序 服务器是 C C 应用程序 客户端是面向 Windows Android 和 iOS 的跨平台 Xamarin 应用程序 服务器部分和客户端部分使用 ZeroMQ 消息进行通信 我尝试了当前的 C 实
  • 如何使用 ZMQ 发送/接收通过 Protocol Buffers 序列化的二进制数据

    我需要在 ZMQ 套接字上发送一个对象 用 GPB 序列化 目前该代码有一个额外的副本 如何直接将序列化数组写入message ts data ABT CommunicationProtocol introPacket Fill the p
  • 如何监控ZeroMQ服务器是否存在?

    我想在发送 ZeroMQ 请求之前检查服务器的存在 状态 但我不知道该怎么做 Q 我想在发送 ZeroMQ 请求之前检查服务器的存在 状态 解决方案是设置并使用zmq socket monitor Read one event off th
  • 未找到类 ZMQContext

    我在 Ubuntu 14 04 的虚拟机内的 nginx 1 4 6 和 php 5 5 上运行 Web 服务器 并且需要安装 ZeroMQ 扩展 我已按照以下说明进行操作ZMQ http zeromq org area download
  • NODE_MODULE_VERSION 46。此版本的 Node.js 需要 NODE_MODULE_VERSION 64。请尝试重新编译或重新安装

    我正在尝试执行提供给我的节点应用程序 它应该可以正常工作 我已尝试运行它 但无法修复此错误 seba vps92941 services drivetech node awto js home seba services drivetech
  • 了解 Zeromq Java 绑定

    我正在研究 Zeromq 作为 Java 项目中的消息传递解决方案 但我发现有关 Java 绑定的说明有点难以遵循 http www zeromq org bindings java http www zeromq org bindings
  • 负载测试 ZeroMQ (ZMQ_STREAM) 以查找它可以处理的最大并发用户数

    有没有人有任何实际场景对 ZMQ 套接字进行负载测试以获得最大数量 他们可以处理的 并发用户 不是吞吐量 看起来 ZeroMQ 在 FD 限制方面存在一些严重问题 场景是 有许多 Web 服务器框架吹嘘它们可以处理数百万个并发用户 现在 如
  • clrzmq 在 Xamarin Studios/C# 应用程序中找不到 libzmq

    我在 Mac 上使用 Xamarin Studio clrzmq通过 NuGet 包含 libzmq dll 上的 clrzmq 引用 我的应用程序编译得很好 但是当我尝试运行它时 我得到了这个 Unhandled Exception Sy
  • 将 ZeroMQ 与 C# 和 inproc 传输一起使用

    我正在尝试 ZeroMQ 并试图得到某物在职的 我的第一个想法是使用 inproc 传输设置 REP REQ 看看是否可以在两个线程之间发送消息 下面的大部分代码取自 clzmq 示例 但它似乎不起作用 服务器和客户端都绑定到传输 但是当客
  • ZMQ 模式经销商/路由器心跳

    我在客户端有一个经销商套接字 它连接到服务器端的路由器套接字 我经常看到心跳机制 服务器定期向客户端发送消息 以便客户端知道自己是否正确连接到服务器 以便客户端在一段时间内没有收到消息时可以重新连接 例如这里的偏执海盗模式 http zgu
  • REQ/REP 模式中的 ZeroMQ FiniteStateMachineException

    我有两个简单的组件 它们应该使用 REQ REP ZeroMQ 模式相互通信 服务器 REP Socket 是使用 pyzmq 在 Python 中实现的 import zmq def launch server print Launchi
  • AttributeError:“str”对象没有属性“decode”

    我将使用 pip 软件包安装程序在我的电脑上安装 ZeroMQ 库 我用https learning 0mq with pyzmq readthedocs org en latest pyzmq basics html installati
  • Redis Cluster 与 Pub/Sub 中的 ZeroMQ,用于水平扩展的分布式系统

    如果我要设计一个巨大的分布式系统 其吞吐量应随系统中的订阅者数量和通道数量线性扩展 哪个会更好 1 Redis集群 仅适用于Redis 3 0 alpha 如果是集群模式 您可以在一个节点上发布并在另一个完全不同的节点上订阅 消息将传播并到
  • Python zmq SUB 套接字未接收 MQL5 Zmq PUB 套接字

    我正在尝试在 MQL5 中设置一个 PUB 套接字 并在 Python 中设置一个 SUB 套接字来接收消息 我在 MQL5 中有这个 include

随机推荐