如何从多个线程安全地写入套接字?

2023-12-08

我正在使用 asio (非升压)创建一个 TCP 服务器,虽然我的代码可以工作,但它没有正确完成,因为我正在调用asio::async_write来自多个线程。我think我应该使用股线,但我读得越多,我就越迷失。

#include <cstdlib>
#include <iostream>
#include <utility>
#include <thread>
#include <asio/ts/buffer.hpp>
#include <asio/ts/internet.hpp>
#include "messages.h"

using asio::ip::tcp;

class session
    : public std::enable_shared_from_this<session>
{
public:
    session(tcp::socket socket)
        : socket_(std::move(socket))
    {
    }

    void start()
    {
        handler = MessageHandler();
        asio::write(socket_, asio::buffer(handler.initialMessage()));
        do_read();
    }

private:
    void do_read()
    {
        auto self(shared_from_this());
        socket_.async_read_some(asio::buffer(data_, max_length),
            [this, self](std::error_code ec, std::size_t length)
            {
                if (!ec)
                {
                    buffer_.append(data_, length);
                    size_t pos;
                    while ((pos = buffer_.find('\0')) != std::string::npos)
                    {
                        std::string message = buffer_.substr(0, pos);
                        buffer_.erase(0, pos + 1);

                        std::thread(&session::process_message, this, message).detach();
                    }

                    do_read();
                }
                else if (ec != asio::error::eof)
                {
                    std::cerr << "Read error: " << ec.message() << '\n';
                }
            });
    }

    void do_write(std::string message)
    {
        auto self(shared_from_this());
        asio::async_write(socket_, asio::buffer(message),
            [this, self](std::error_code ec, std::size_t /*length*/)
            {
                if (!ec)
                {
                }
                else if (ec != asio::error::eof)
                {
                    std::cerr << "Write error: " << ec.message() << '\n';
                }
            });
    }

    void process_message(std::string message) {
        std::string response = handler.processMessage(message);
        do_write(response);
    }

    tcp::socket socket_;
    enum { max_length = 1024 };
    char data_[max_length];
    std::string buffer_;
    MessageHandler handler;
};

class server
{
public:
    server(asio::io_context& io_context, unsigned short port)
        : acceptor_(io_context, tcp::endpoint(tcp::v4(), port)),
        socket_(io_context)
    {
        do_accept();
    }

private:
    void do_accept()
    {
        acceptor_.async_accept(socket_,
            [this](std::error_code ec)
            {
                if (!ec)
                {
                    std::make_shared<session>(std::move(socket_))->start();
                }

                do_accept();
            });
    }

    tcp::acceptor acceptor_;
    tcp::socket socket_;
};

void serverInit()
{
    try
    {
        asio::io_context io_context;

        server s(io_context, 0);

        io_context.run();
    }
    catch (std::exception& e)
    {
        std::cerr << "Exception: " << e.what() << '\n';
    }
}


您只有 1 个线程运行 IO 服务。一切都在隐含的链上(为什么使用 boost::asio 时每个连接都需要链?),无需担心,直到您开始使用新线程。

那么,最简​​单的修复似乎是确保发送回复也发生在 IO 服务上:

void process_message(std::string const& message) {
    std::string response = handler.processMessage(message);
    post(socket_.get_executor(),
         std::bind(&session::do_write, shared_from_this(), response));
}

现在,如果您希望能够在多个线程上运行 IO 服务,您只需确保套接字使用链执行器即可。

HOWEVER

这并不能保证您不会看到重叠的 async_write 操作,因为处理传入消息的速度可能高于发送消息的速度。因此通常的解决方案是

Queueing

在我的示例中,我通常将此 FIFO 队列称为“outbox_”,并且我更喜欢使用deque出于迭代器/引用稳定性的原因(参见C++ 容器的迭代器失效规则):

void do_write(std::string message)
{
    outbox_.push_back(std::move(message)); // assumed on (implicit) strand
    if (outbox_.size() == 1) {
        write_loop();
    }
}

void write_loop() {
    if (outbox_.empty())
        return;

    asio::async_write( //
        socket_, asio::buffer(outbox_.front()),
        [this, self = shared_from_this()] //
        (error_code ec, std::size_t /*length*/) {
            if (!ec) {
                outbox_.pop_front();
                write_loop();
            } else if (ec != asio::error::eof) {
                std::cerr << "Write error: " << ec.message() << '\n';
            }
        });
}

Demo

这是带有存根 message.h 的固定列表。

It also greatly通过使用现有的简化读取/缓冲区处理async_read_until组合操作,它可以完成您手动编写的所有操作。

住在科里鲁

#include <boost/asio.hpp>
#include <cstdlib>
#include <deque>
#include <iostream>
#include <thread>
#include <utility>
#if 0
    #include "messages.h"
#else // mock messages.h
    #include <boost/lexical_cast.hpp>
    #include <iomanip>
    struct MessageHandler {
        std::string initialMessage() const { return "Initial\n"; }
        std::string processMessage(std::string const& req) const {
            return "Processed " +
                boost::lexical_cast<std::string>(std::quoted(req)) + "\n";
        }
    };
#endif

namespace asio = boost::asio;
using boost::system::error_code;
using asio::ip::tcp;

class session : public std::enable_shared_from_this<session> {
  public:
    session(tcp::socket socket) : socket_(std::move(socket)) {}

    void start() {
        handler = MessageHandler();
        asio::write(socket_, asio::buffer(handler.initialMessage()));
        do_read();
    }

  private:
    void do_read() {
        async_read_until(
            socket_, asio::dynamic_buffer(buffer_), '\0',
            [this, self = shared_from_this()] //
            (error_code ec, std::size_t length) {
                if (!ec) {
                    std::thread(&session::process_message, this, buffer_.substr(0, length - 1)).detach();
                    buffer_.erase(0, length);

                    do_read();
                } else if (ec != asio::error::eof) {
                    std::cerr << "Read error: " << ec.message() << std::endl;
                }
            });
    }

    void do_write(std::string message)
    {
        outbox_.push_back(std::move(message)); // assumed on (implicit) strand
        if (outbox_.size() == 1) {
            write_loop();
        }
    }

    void write_loop() {
        if (outbox_.empty())
            return;

        asio::async_write( //
            socket_, asio::buffer(outbox_.front()),
            [this, self = shared_from_this()] //
            (error_code ec, std::size_t /*length*/) {
                if (!ec) {
                    outbox_.pop_front();
                    write_loop();
                } else if (ec != asio::error::eof) {
                    std::cerr << "Write error: " << ec.message() << std::endl;
                }
            });
    }

    void process_message(std::string const& message) {
        std::string response = handler.processMessage(message);
        // dispatch/post to executor because we are on a different thread
        post(socket_.get_executor(),
             std::bind(&session::do_write, shared_from_this(), response));
    }

    tcp::socket socket_;
    std::string buffer_;
    std::deque<std::string> outbox_;
    MessageHandler handler;
};

class server
{
  public:
    server(asio::io_context& io_context, unsigned short port)
        : acceptor_(io_context, tcp::endpoint(tcp::v4(), port)),
        socket_(io_context)
    {
        do_accept();
    }

  private:
    void do_accept()
    {
        acceptor_.async_accept(socket_, [this](error_code ec) {
            if (!ec) {
                std::cout << "Accepted " << socket_.remote_endpoint() << std::endl;
                std::make_shared<session>(std::move(socket_))->start();
            }

            do_accept();
        });
    }

    tcp::acceptor acceptor_;
    tcp::socket socket_;
};

void serverInit() {
    try {
        asio::io_context io_context;

        server s(io_context, 8989);

        io_context.run();
    } catch (std::exception const& e) {
        std::cerr << "Exception: " << e.what() << std::endl;
    }
}

int main() { serverInit(); }

当发送最后一次突发请求时:

printf 'Message%d\0' {1..100} | nc 127.0.0.1 8989 -w1

正确打印,例如:

Accepted 127.0.0.1:34862

客户端收到例如:

Initial
Processed "Message2"
Processed "Message1"
Processed "Message4"
Processed "Message3"
Processed "Message5"
Processed "Message6"
Processed "Message7"
Processed "Message8"
Processed "Message9"
Processed "Message10"
Processed "Message11"
Processed "Message12"
Processed "Message13"
Processed "Message15"
Processed "Message16"
Processed "Message14"
Processed "Message18"
Processed "Message19"
Processed "Message20"
Processed "Message21"
Processed "Message22"
Processed "Message23"
Processed "Message24"
Processed "Message25"
Processed "Message26"
Processed "Message27"
Processed "Message28"
Processed "Message29"
Processed "Message30"
Processed "Message31"
Processed "Message32"
Processed "Message33"
Processed "Message34"
Processed "Message35"
Processed "Message17"
Processed "Message36"
Processed "Message38"
Processed "Message39"
Processed "Message40"
Processed "Message41"
Processed "Message42"
Processed "Message43"
Processed "Message44"
Processed "Message45"
Processed "Message46"
Processed "Message47"
Processed "Message48"
Processed "Message49"
Processed "Message50"
Processed "Message51"
Processed "Message52"
Processed "Message53"
Processed "Message54"
Processed "Message55"
Processed "Message56"
Processed "Message57"
Processed "Message58"
Processed "Message59"
Processed "Message60"
Processed "Message61"
Processed "Message62"
Processed "Message63"
Processed "Message64"
Processed "Message65"
Processed "Message66"
Processed "Message67"
Processed "Message68"
Processed "Message69"
Processed "Message70"
Processed "Message71"
Processed "Message72"
Processed "Message73"
Processed "Message74"
Processed "Message75"
Processed "Message76"
Processed "Message77"
Processed "Message78"
Processed "Message79"
Processed "Message80"
Processed "Message81"
Processed "Message82"
Processed "Message83"
Processed "Message84"
Processed "Message85"
Processed "Message86"
Processed "Message87"
Processed "Message88"
Processed "Message89"
Processed "Message90"
Processed "Message91"
Processed "Message92"
Processed "Message93"
Processed "Message94"
Processed "Message95"
Processed "Message96"
Processed "Message97"
Processed "Message98"
Processed "Message99"
Processed "Message100"
Processed "Message37"

奖励:添加股线

最小的变化:

class server
{
public:
  server(asio::any_io_executor ex, unsigned short port)
      : acceptor_(ex, tcp::endpoint(tcp::v4(), port)) {
      do_accept();
  }

private:
    void do_accept()
    {
        acceptor_.async_accept(make_strand(acceptor_.get_executor()), [this](error_code ec, tcp::socket&& s) {
            if (!ec) {
                std::cout << "Accepted " << s.remote_endpoint() << std::endl;
                std::make_shared<session>(std::move(s))->start();
            }

            do_accept();
        });
    }

    tcp::acceptor acceptor_;
};

void serverInit() {
    try {
        asio::thread_pool io_context;

        server s(io_context.get_executor(), 8989);

        io_context.join();
    } catch (std::exception const& e) {
        std::cerr << "Exception: " << e.what() << std::endl;
    }
}

现场演示:

enter image description here

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何从多个线程安全地写入套接字? 的相关文章

随机推荐

  • 多次调用异步函数

    所以我有一个方法 我想在循环中多次调用它 这是函数 function PageSpeedCall callback var pagespeedCall https www googleapis com pagespeedonline v4
  • Meteor:Tracker.autorun/observeChanges 和集合未按预期工作

    我对使用流星很陌生 所以我希望得到关于这些函数如何工作以及我应该如何使用它们的非常基本的解释 否则 如果有一种方法更适合我希望实现的目标 那么我将不胜感激 我希望实现的功能 我有一个 Mongo 集合 其中包含分配给特定用户的文档中的数字值
  • 限制带宽速度

    我编写了一个将本地文件夹与在线文件夹同步的应用程序 但它占用了我所有的带宽 我如何限制应用程序使用的带宽量 以编程方式 看一眼http www codeproject com KB IP MyDownloader aspx 他使用的是众所周
  • WP 8.1 从下到上无限滚动

    我探索过I支持增量加载并看到质谱样品 and 其他例子用于无限滚动行为 但我想要从下到上滚动 其中项目在从下到上滚动时添加到顶部 编辑 我已经找到了解决此问题的方法 我将列表视图旋转了 180 度 将数据模板旋转了 180 度 这帮助我实现
  • 修剪 R 中的数据,去掉“*”

    我有一个数据集 如下所示 gt data lt c IGHV1 2 02 F or IGHV1 2 03 F IGHV3 23 01 F or gt IGHV3 23 04 F IGHV2 70 01 F IGHV7 4 1 01 例如 我
  • 如何将 uint8_t 与 I/O 流一起使用,同时避免 char 行为?

    考虑这个简单的 C 程序 include
  • Python pyqt 多线程脉冲进度条

    请耐心解答我的问题 因为我是初学者 我在 pyqt 中实现进度条时遇到问题 我看到的所有示例都没有真正解释如何正确实现它 并由此而来example和这个example我在某种程度上使其工作正常 但它仍然挂起 我有这个代码 class Win
  • Javascript 使用变量通过按钮创建 url

    我的网站上有一个按钮 单击该按钮会生成一个单词 然后在 url 调用中使用该单词来下载特定文件
  • 如何获取卷 GUID

    我正在使用 win32 apiC 我想知道如何使用 设备路径 获取卷 GUID 我的设备如下所示 usb vid 04f2 pid 0111 5 39fe81e 0 2 a5dcbf10 6530 11d2 901f 00c04fb951e
  • JavaScript cookie 删除

    如果我用 Javascript 创建一个 cookiedocument cookie unseen 当我离开此页面时如何删除它 这是我在页面上创建的唯一 cookie 运行这个 document cookie unseen expires
  • PHP:Imagick:合并透明图像

    我想将透明PNG合并到另一个图像中 但是PNG的边框不会按照需要更改为透明
  • 跟踪 GA 中的主题标签和查询字符串

    我有一些贴纸 其中包含我网站的 URL 二维码 如下所示 我会把它贴在街上 正如你所看到的 如果有人阅读了这个二维码 他就会转发到http issocial net qr page 现在我想跟踪通过此二维码贴纸访问我的网站的人 遗憾的是 G
  • 在 MATLAB 中向现有矩阵添加新列?

    我有一个包含两列的矩阵 其中一列是日期 另一列是我必须执行一些操作的数量 我想在现有矩阵中添加第三列 我打算通过将第三列表示为列向量 然后将其添加到我现有的矩阵中来解决此问题 尽管我不确定如何将另一列添加到矩阵中 任何帮助将不胜感激 对于第
  • 无法激活 IDL 中定义的投影类型

    我试图在 IDL 中定义 Windows 运行时类型 并使用其投影类型 从默认生成的空白应用程序UWP 项目 称为 空白应用程序 我补充说 我的控件 idl namespace BlankApp default interface runt
  • Android推送服务,实现gcm服务器端

    我对 Android 推送世界还是个新手 几天来我一直在挣扎 我毫无问题地创建并实现了它的 GCM 客户端 我还创建了我的谷歌云项目 启用了android推送notif s并得到了我的Project Number Project ID an
  • FluentNHibernate 字典映射

    使用 Fluent NHibernate 映射简单 Dictionary 属性的最佳方法是什么 public class PersistedData public virtual IDictionary
  • 如何使用 TFS 2010 SDK 获取分支的所有未合并变更集?

    目前我有 2 个分支 开发和发布 是否可以获得从开发到发布的所有未合并变更集 目前我们使用默认的合并向导 然而它有一个很大的限制 它不能按用户过滤 因此 我正在考虑构建一个应用程序 它将所有未合并的变更集从开发拉到发布 并允许我按用户过滤这
  • Erlang VM (BEAM) 是如何构建列表的?

    当我在 Erlang 中创建列表时 例如在 Erlang shell 中 1 gt 1 2 据我了解 在虚拟机中 该列表将表示为单链表 Erlang 运行时如何创建这个结构 例如 它的构造是这样的 在内存中创建一个结构来保存终止列表的列表
  • 具有相同 GroupId 的多个 Kafka 监听器都接收消息

    我在 Spring Boot 应用程序中配置了一个 kafka 监听器 如下所示 KafkaListener topicPartitions TopicPartition topic data all partitions 0 1 2 gr
  • 如何从多个线程安全地写入套接字?

    我正在使用 asio 非升压 创建一个 TCP 服务器 虽然我的代码可以工作 但它没有正确完成 因为我正在调用asio async write来自多个线程 我think我应该使用股线 但我读得越多 我就越迷失 include