Boost::Asio:为什么 async_write 在通过给定套接字发送缓冲区时会截断缓冲区?

2024-02-11

我目前正在尝试设计一个相当简单的boost::asio服务器。我的第一个单元测试相当简单:发送 JSON 请求{"COMMAND": "ADD_1", "VALUE" : 1}并收到以下回复:

{
    "SUCCESS" : true,
    "VALUE" : 2
}

然而,相反,回复在被截断后被截断一个字符。read从插座:

Reply is: {
    "SUCCESS" : true,
    "VALUE" : 2

Process finished with exit code 0

写入套接字的代码相当简单,一个类的成员函数RequestContext:

    void RequestContext::DoWrite(std::size_t length)
    {
        JSONCPP_STRING parse_err;
        Json::Value json_req, json_resp;
        auto self(this->shared_from_this());
        std::string client_req_str(data_);

        if (reader_->parse(client_req_str.c_str(),
                           client_req_str.c_str() +
                           client_req_str.length(),
                           &json_req, &parse_err))
        {
            try {
                // Get JSON response.
                json_resp = ProcessRequest(json_req);
                json_resp["SUCCESS"] = true;
            } catch (const std::exception &ex) {
                // If json parsing failed.
                json_resp["SUCCESS"] = false;
                json_resp["ERRORS"] = std::string(ex.what());
            }
        } else {
            // If json parsing failed.
            json_resp["SUCCESS"] = false;
            json_resp["ERRORS"] = std::string(parse_err);
        }

        std::string resp = Json::writeString(writer_, json_resp);

        boost::asio::async_write(socket_,
                                 boost::asio::buffer(&resp[0], resp.size()),
                                 [this, self]
                                 (boost::system::error_code ec,
                                  std::size_t bytes_xfered) {
                                    if (!ec)     DoRead();
                                 });
    }

我已经证实ProcessRequest返回正确的值,所以问题显然是async_write。我尝试将第二个参数的值增加到async_write,但好像没有什么效果。我究竟做错了什么?

下面是一个最小的可重现示例:

#include <cstdlib>
#include <iostream>
#include <memory>
#include <utility>
#include <boost/asio.hpp>
#include <boost/system/error_code.hpp>
#include <json/json.h>

using boost::asio::ip::tcp;
using boost::system::error_code;
/// NOTE: This class exists exclusively for unit testing.
class RequestClass {
public:
    /**
     * Initialize class with value n to add sub from input values.
     *
     * @param n Value to add/sub from input values.
     */
    explicit RequestClass(int n) : n_(n) {}

    /// Value to add/sub from
    int n_;

    /**
     * Add n to value in JSON request.
     *
     * @param request JSON request with field "value".
     * @return JSON response containing modified field "value" = [original_value] + n.
     */
    [[nodiscard]] Json::Value add_n(const Json::Value &request) const
    {
        Json::Value resp;
        resp["SUCCESS"] = true;

        // If value is present in request, return value + 1, else return error.
        if (request.get("VALUE", NULL) != NULL) {
            resp["VALUE"] = request["VALUE"].asInt() + this->n_;
        } else {
            resp["SUCCESS"] = false;
            resp["ERRORS"] = "Invalid value.";
        }
        return resp;
    }

    /**
     * Sun n from value in JSON request.
     *
     * @param request JSON request with field "value".
     * @return JSON response containing modified field "value" = [original_value] - n.
     */
    [[nodiscard]] Json::Value sub_n(const Json::Value &request) const
    {
        Json::Value resp, value;
        resp["SUCCESS"] = true;

        // If value is present in request, return value + 1, else return error.
        if (request.get("VALUE", NULL) != NULL) {
            resp["VALUE"] = request["VALUE"].asInt() - this->n_;
        } else {
            resp["SUCCESS"] = false;
            resp["ERRORS"] = "Invalid value.";
        }
        return resp;
    }
};

typedef std::function<Json::Value(RequestClass, const Json::Value &)> RequestClassMethod;

template<class RequestHandler, class RequestClass>
class RequestContext :
    public std::enable_shared_from_this<RequestContext<RequestHandler,
                                                       RequestClass>>
{
public:
    typedef std::map<std::string, RequestHandler> CommandMap;

    RequestContext(tcp::socket socket, CommandMap commands,
                   RequestClass *request_class_inst)
        : socket_(std::move(socket))
        , commands_(std::move(commands))
        , request_class_inst_(request_class_inst)
        , reader_((new Json::CharReaderBuilder)->newCharReader())
    {}

    void Run()
    {
        DoRead();
    }

    void Kill()
    {
        continue_ = false;
    }

private:
    tcp::socket socket_;
    RequestClass *request_class_inst_;
    CommandMap commands_;
    /// Reads JSON.
    const std::unique_ptr<Json::CharReader> reader_;
    /// Writes JSON.
    Json::StreamWriterBuilder writer_;
    bool continue_ = true;
    char data_[2048];

    void DoRead()
    {
        auto self(this->shared_from_this());
        socket_.async_read_some(boost::asio::buffer(data_, 2048),
                                [this, self](error_code ec, std::size_t length)
                                {
                                  if (!ec)
                                  {
                                      DoWrite(length);
                                  }
                                });
    }

    void DoWrite(std::size_t length)
    {
        JSONCPP_STRING parse_err;
        Json::Value json_req, json_resp;
        auto self(this->shared_from_this());
        std::string client_req_str(data_);

        if (reader_->parse(client_req_str.c_str(),
                           client_req_str.c_str() +
                           client_req_str.length(),
                           &json_req, &parse_err))
        {
            try {
                // Get JSON response.
                json_resp = ProcessRequest(json_req);
                json_resp["SUCCESS"] = true;
            } catch (const std::exception &ex) {
                // If json parsing failed.
                json_resp["SUCCESS"] = false;
                json_resp["ERRORS"] = std::string(ex.what());
            }
        } else {
            // If json parsing failed.
            json_resp["SUCCESS"] = false;
            json_resp["ERRORS"] = std::string(parse_err);
        }

        std::string resp = Json::writeString(writer_, json_resp);

        boost::asio::async_write(socket_,
                                 boost::asio::buffer(&resp[0], resp.size()),
                                 [this, self]
                                 (boost::system::error_code ec,
                                  std::size_t bytes_xfered) {
                                    if (!ec)     DoRead();
                                 });
    }

    Json::Value ProcessRequest(Json::Value request)
    {
        Json::Value response;
        std::string command = request["COMMAND"].asString();

        // If command is not valid, give a response with an error.
        if(commands_.find(command) == commands_.end()) {
            response["SUCCESS"] = false;
            response["ERRORS"] = "Invalid command.";
        }
            // Otherwise, run the relevant handler.
        else {
            RequestHandler handler = commands_.at(command);
            response = handler(*request_class_inst_, request);
        }

        return response;
    }

};

template<class RequestHandler, class RequestClass>
class Server {
public:
    typedef std::map<std::string, RequestHandler> CommandMap;

    Server(boost::asio::io_context &io_context, short port,
           const CommandMap &commands,
           RequestClass *request_class_inst)
        : acceptor_(io_context, tcp::endpoint(tcp::v4(), port))
        , commands_(commands)
        , request_class_inst_(request_class_inst)
    {
        DoAccept();
    }

    ~Server()
    {
        Kill();
    }

    void Kill()
    {
        continue_ = false;
    }

private:
    tcp::acceptor acceptor_;
    bool continue_ = true;
    CommandMap commands_;
    RequestClass *request_class_inst_;

    void DoAccept()
    {
        acceptor_.async_accept(
            [this](boost::system::error_code ec, tcp::socket socket) {
                if (!ec)
                    std::make_shared<RequestContext<RequestHandler, RequestClass>>
                        (std::move(socket), commands_, request_class_inst_)->Run();
                DoAccept();
            });
    }
};

void RunServer(short port)
{
    boost::asio::io_context io_context;
    auto *request_inst = new RequestClass(1);
    std::map<std::string, RequestClassMethod> commands {
        {"ADD_1", std::mem_fn(&RequestClass::add_n)},
        {"SUB_1", std::mem_fn(&RequestClass::sub_n)}
    };
    Server<RequestClassMethod, RequestClass> s(io_context, port, commands,
                                               request_inst);

    io_context.run();
}

void RunServerInBackground(short port)
{
    std::thread t([port] { RunServer(port); });
    t.detach();
}



int main()
{
    try
    {
        RunServerInBackground(5000);
        boost::asio::io_context io_context;
        tcp::socket s(io_context);
        tcp::resolver resolver(io_context);

        boost::asio::connect(s, resolver.resolve("127.0.0.1", "5000"));
        char request[2048] = R"({"COMMAND": "ADD_1", "VALUE" : 1})";
        size_t request_length = std::strlen(request);
        boost::asio::write(s, boost::asio::buffer(request, request_length));
        char reply[2048];
        size_t reply_length = boost::asio::read(s, boost::asio::buffer(reply, request_length));
        std::cout << "Reply is: ";
        std::cout << reply << std::endl;
    }
    catch (std::exception& e)
    {
        std::cerr << "Exception: " << e.what() << "\n";
    }

    return 0;
}

传出缓冲区需要是类成员,就像data_,这样使用寿命就得到保证,直到async_write完成了。

您还可以通过 ASAN/UBSAN 或 Valgrind 等 linter/运行时检查来发现此类问题。

UPDATE

Also

size_t reply_length =
    boost::asio::read(s, boost::asio::buffer(reply, request_length));

错误地使用request_length。通常,请避免随时手动指定缓冲区大小。

此外,您的协议不提供帧,因此您实际上无法为较新的请求保持相同的连接打开(您不知道完整的响应需要多少字节)。我将通过在第一个请求后关闭连接来“修复”它,这样我们就有了一个有效的演示。

还有一个竞争条件continue_标志,但我会把它留给读者作为驱魔。

当然,要考虑不要泄漏请求类实例。

哦,我还改用了 Boost JSON,因为它似乎更适合:

住在科里鲁 http://coliru.stacked-crooked.com/a/32b2f448dc94cd22

#include <boost/asio.hpp>
#include <boost/json.hpp>
#include <boost/json/src.hpp>
#include <iostream>

using boost::asio::ip::tcp;
using boost::system::error_code;
namespace json = boost::json;
using Value    = json::object;

/// NOTE: This class exists exclusively for unit testing.
struct Sample {
    int n_;

    Value add_n(Value const& request) const { return impl(std::plus<>{}, request); }
    Value sub_n(Value const& request) const { return impl(std::minus<>{}, request); }
    Value mul_n(Value const& request) const { return impl(std::multiplies<>{}, request); }
    Value div_n(Value const& request) const { return impl(std::divides<>{}, request); }

  private:
    template <typename Op> Value impl(Op op, Value const& req) const {
        return (req.contains("VALUE"))
            ? Value{{"VALUE", op(req.at("VALUE").as_int64(), n_)},
                    {"SUCCESS", true}}
            : Value{{"ERRORS", "Invalid value."}, {"SUCCESS", false}};
    }
};

using RequestClassMethod =
    std::function<Value(Sample, Value const&)>;

template <class RequestHandler, class RequestClass>
class RequestContext
    : public std::enable_shared_from_this<
          RequestContext<RequestHandler, RequestClass>> {
  public:
    using CommandMap = std::map<std::string, RequestHandler>;

    RequestContext(tcp::socket socket, CommandMap commands,
                   RequestClass* request_class_inst)
        : socket_(std::move(socket))
        , commands_(std::move(commands))
        , request_class_inst_(request_class_inst)
    {}

    void Run()  { DoRead(); }
    void Kill() { continue_ = false; }

  private:
    tcp::socket   socket_;
    CommandMap    commands_;
    RequestClass* request_class_inst_;
    bool          continue_ = true;
    char          data_[2048];
    std::string   resp_;

    void DoRead()
    {
        socket_.async_read_some(
            boost::asio::buffer(data_),
            [this, self = this->shared_from_this()](error_code ec, std::size_t length) {
                if (!ec) {
                    DoWrite(length);
                }
            });
    }

    void DoWrite(std::size_t length)
    {
        Value json_resp;

        try {
            auto json_req = json::parse({data_, length}).as_object();
            json_resp = ProcessRequest(json_req);
            json_resp["SUCCESS"] = true;
        } catch (std::exception const& ex) {
            json_resp = {{"SUCCESS", false}, {"ERRORS", ex.what()}};
        }

        resp_ = json::serialize(json_resp);

        boost::asio::async_write(socket_, boost::asio::buffer(resp_),
             [this, self = this->shared_from_this()](
                 error_code ec, size_t bytes_xfered) {
                 if (!ec)
                     DoRead();
             });
    }

    Value ProcessRequest(Value request)
    {
        auto command = request.contains("COMMAND")
            ? request["COMMAND"].as_string() //
            : "";
        std::string cmdstr(command.data(), command.size());

        // If command is not valid, give a response with an error.
        return commands_.contains(cmdstr) && request_class_inst_
            ? commands_.at(cmdstr)(*request_class_inst_, request)
            : Value{{"SUCCESS", false}, {"ERRORS","Invalid command."}};
    }
};

template<class RequestHandler, class RequestClass>
class Server {
  public:
    using CommandMap = std::map<std::string, RequestHandler>;

    Server(boost::asio::io_context& io_context, uint16_t port,
           const CommandMap& commands, RequestClass* request_class_inst)
        : acceptor_(io_context, {{}, port})
        , commands_(commands)
        , request_class_inst_(request_class_inst)
    {
        DoAccept();
    }

    ~Server() { Kill(); }
    void Kill() { continue_ = false; }

private:
    tcp::acceptor acceptor_;
    bool          continue_ = true;
    CommandMap    commands_;
    RequestClass *request_class_inst_;

    void DoAccept()
    {
        acceptor_.async_accept(
            [this](error_code ec, tcp::socket socket) {
                if (!ec)
                    std::make_shared<
                        RequestContext<RequestHandler, RequestClass>>(
                        std::move(socket), commands_, request_class_inst_)
                        ->Run();
                DoAccept();
            });
    }
};

void RunServer(uint16_t port)
{
    boost::asio::io_context io_context;

    Server<RequestClassMethod, Sample> s(
        io_context, port,
        {{"ADD_2", std::mem_fn(&Sample::add_n)},
         {"SUB_2", std::mem_fn(&Sample::sub_n)},
         {"MUL_2", std::mem_fn(&Sample::mul_n)},
         {"DIV_2", std::mem_fn(&Sample::div_n)}},
        new Sample{2});

    io_context.run();
}

void RunServerInBackground(uint16_t port)
{
    std::thread t([port] { RunServer(port); });
    t.detach();
}

int main() try {
    RunServerInBackground(5000);
    ::sleep(1); // avoid startup race

    boost::asio::io_context io;
    tcp::socket s(io);
    s.connect({{}, 5000});

    std::string const request = R"({"COMMAND": "MUL_2", "VALUE" : 21})";
    std::cout << "Request: " << std::quoted(request, '\'') << std::endl;

    boost::asio::write(s, boost::asio::buffer(request));
    s.shutdown(tcp::socket::shutdown_send); // avoid framing problems

    error_code ec;
    char reply[2048];
    size_t reply_length = boost::asio::read(s, boost::asio::buffer(reply), ec);

    std::cout << "Reply is: "
              << std::quoted(std::string_view(reply, reply_length), '\'')
              << " (" << ec.message() << ")" << std::endl;
} catch (std::exception const& e) {
    std::cerr << "Exception: " << e.what() << "\n";
}

Prints

Request: '{"COMMAND": "MUL_2", "VALUE" : 21}'
Reply is: '{"VALUE":42,"SUCCESS":true}' (End of file)
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Boost::Asio:为什么 async_write 在通过给定套接字发送缓冲区时会截断缓冲区? 的相关文章

  • 获取 TextBox 中的文本行数

    我试图通过标签显示文本框中的文本行数 但是 问题是如果最后一行为空 标签必须显示没有空行的行号 例如 如果它们有 5 行 最后一行为空 则标签应将行数显示为 4 Thanks private void txt CurrentVinFilte
  • C++0x 初始值设定项列表示例

    我想看看这个现有代码示例如何利用 C 0x 初始化列表功能 示例0 include
  • 静态 OpenCV 库中未定义的引用

    我有一个使用 OpenCV 3 1 的 C 项目 并且使用共享库可以正常工作 但现在我想使用静态库 位于项目目录中的文件夹中 来编译它 因为我希望能够在未安装 OpenCV 的情况下导出它 如果需要还可以编辑和重新编译 这次我重新编译了 O
  • 如何部署包含第三方 DLL 文件的 C# 应用程序?

    首先 我对部署了解不多 我希望我的问题有意义 我需要将 C 应用程序安装 部署到多个桌面 它需要一个第三方 DLL 文件 一个 C 库 lpsolve55 dll 对于那些感兴趣的人 它是一个免费的 MIP LP 求解器 请参阅 lpsol
  • 为什么我在 WinForms 列表框中得到“System.Data.DataRowView”而不是实际值?

    每当我运行代码并尝试查看highscore我在列表框中得到的只是System Data DataRowView 谁能明白为什么吗 Code MySqlConnection myConn new MySqlConnection connStr
  • 使用默认行为将模型绑定到接口

    我正在尝试将控制器操作绑定到接口 但仍保持默认的绑定行为 public class CoolClass ISomeInterface public DoSomething get set ISomeInterface public clas
  • 打开位置设置页面或提示用户启用位置

    我一直在绞尽脑汁 徒劳地谷歌搜索 我正在尝试找到一种方法来提示用户通过直接进入设置页面或仅点击屏幕上的 是 来切换位置 我见过的所有代码似乎都不起作用 有人有有效的方法吗 一个详细的例子将不胜感激 谢谢 我对 Xamarin 开发非常陌生
  • PartialView Action 正在调用自身

    我有 MVC 应用程序 它用于从主视图 ProductMaster 将 ProductAreaGrid 列表显示为 PartialView 并且它将在局部视图内将 CreateProductArea 作为 PartialView 我的 Gr
  • 多个线程访问一个变量

    我在正在读的一本教科书中发现了这个问题 下面也给出了解决方案 我无法理解最小值怎么可能是 2 为什么一个线程不能读取 0 而所有其他线程都执行并写入 1 而无论是1还是2 最后写入的线程仍然必须完成自己的循环 int n 0 int mai
  • 根据 Active Directory 策略检查密码[重复]

    这个问题在这里已经有答案了 我有一个允许用户更改其 AD 密码的前端 有没有办法获取特定用户及其属性 长度 复杂性 的密码策略 例如细粒度 有没有办法根据此特定策略检查字符串 xyz121 编辑 我不想检查活动目录中存储的当前密码 我想检查
  • 如果我重新分配并且新大小为 0,会发生什么情况。这与释放等效吗?

    给出以下代码 int a NULL a calloc 1 sizeof a printf d n a a realloc a 0 printf d n a return 0 它返回 4078904 0 这个 realloc 相当于 free
  • 文件加密与解密问题

    我一直在尝试在 VC Express 2010 中加密和解密文件 我见过的所有教程和文档都需要两个FileStreams 来加密文件 一个用于读取未加密的版本 另一个用于加密 当我实际编写代码时 它不断抛出错误 告诉我它无法打开该文件 因为
  • 如何解决文件被另一个进程使用的问题?

    我一直在 VS NET 2010 中调试 没有任何问题 但现在无法建造 我收到错误 Unable to copy file filename to bin Debug filename The process cannot access t
  • 这些工作队列标志意味着什么?

    在研究工作队列时 我遇到了内核中定义的工作队列标志和常量 我有以下我无法理解的疑问 这里的排水和救援到底是什么意思 WQ DRAINING 1 lt lt 6 internal workqueue is draining WQ RESCUE
  • 禁用实体框架的默认值生成(Code First)

    我数据库中有一个列不能为空 我想将其设置为默认值在数据库中 问题是实体框架似乎自己创建了一个默认值 例如 int gt 0 并且完全忽略了数据库中的默认值约束 有没有办法禁用实体框架的默认值 我发现您可以使用以下属性来装饰您的字段 Data
  • 如果项目包含多个文件夹,如何使用 Add-Migration

    我想Add Migration使用我的 DbContext 但出现错误 The term add migration is not recognized as the name of a cmdlet function script fil
  • Code::Blocks 中的调试似乎不起作用 - 缺少调试符号

    我正在尝试在 Code Blocks 中调试程序 我跟着本指南 http wiki codeblocks org index php title Debugging with Code Blocks and 这个短视频 http www y
  • XCode std::thread C++

    对于学校的一个小项目 我需要创建一个简单的客户端 服务器结构 它将在路由器上运行 使用 openWRT 并且我试图在这个应用程序中使用线程做一些事情 我的 C 技能非常有限 所以我在internet https stackoverflow
  • C# 和断点 - 这里有魔术师吗?

    我有这个 public static void ByLinkText string text for var i 0 i lt 50 i try Setup Driver FindElement By LinkText text Click
  • 线程安全的有限大小队列,不使用锁

    我正在尝试编写一个主题队列 但遇到死锁和其他多线程问题 我想用Interlocked CompareExchange避免lock用法 但这段代码并没有按预期工作 它只是擦除整个队列 我在这里做错了什么 public class FixedS

随机推荐