This http://article.gmane.org/gmane.comp.lib.boost.asio.user/4660邮件列表帖子描述了同样的问题。尽管CreateFile
with FILE_FLAG_OVERLAPPED
允许异步 I/O,但它不会将其建立为 Boost.Asio 上下文中的流。对于流,Boost.Asio 实现read_some
as read_some_at
偏移量总是0
。这就是问题的根源,因为ReadFile() http://msdn.microsoft.com/en-us/library/windows/desktop/aa365467%28v=vs.85%29.aspx文档指出:
对于支持字节偏移的文件,您必须指定从文件开始读取的字节偏移。
适应类型要求
Boost.Asio 的编写非常通用,通常需要参数满足某种类型要求,而不是特定类型。因此,通常可以调整 I/O 对象或其服务以获得所需的行为。首先,必须确定适配的接口需要支持什么。在这种情况下,async_read_until http://www.boost.org/doc/libs/1_54_0/doc/html/boost_asio/reference/async_read_until.html接受满足类型要求的任何类型AsyncReadStream http://www.boost.org/doc/libs/1_54_0/doc/html/boost_asio/reference/AsyncReadStream.html. AsyncReadStream
的要求相当基本,需要void async_read_some(MutableBufferSequence, ReadHandler)
成员函数。
由于需要在整个组合中跟踪偏移值async_read_until
操作时,可以引入满足 ReadHandler 要求的简单类型,该类型将包装应用程序的 ReadHandler,并相应地更新偏移量。
namespace detail {
/// @brief Handler to wrap asynchronous read_some_at operations.
template <typename Handler>
class read_some_offset_handler
{
public:
read_some_offset_handler(Handler handler, boost::uint64_t& offset)
: handler_(handler),
offset_(offset)
{}
void operator()(
const boost::system::error_code& error,
std::size_t bytes_transferred)
{
offset_ += bytes_transferred;
// If bytes were transferred, then set the error code as success.
// EOF will be detected on next read. This is to account for
// the read_until algorithm behavior.
const boost::system::error_code result_ec =
(error && bytes_transferred)
? make_error_code(boost::system::errc::success) : error;
handler_(result_ec, bytes_transferred);
}
//private:
Handler handler_;
boost::uint64_t& offset_;
};
/// @brief Hook that allows the wrapped handler to be invoked
/// within specific context. This is critical to support
/// composed operations being invoked within a strand.
template <typename Function,
typename Handler>
void asio_handler_invoke(
Function function,
detail::read_some_offset_handler<Handler>* handler)
{
boost_asio_handler_invoke_helpers::invoke(
function, handler->handler_);
}
} // namespace detail
The asio_handler_invoke
钩子将通过 ADL 找到,以支持在适当的上下文中调用用户处理程序。当在一个组合内调用组合操作时,这对于踩踏安全至关重要。strand http://www.boost.org/doc/libs/1_54_0/doc/html/boost_asio/overview/core/strands.html。有关组合操作和链的更多详细信息,请参阅this https://stackoverflow.com/a/12801042/1053968 answer.
下面的课程将进行调整boost::asio::windows::random_access_handle http://www.boost.org/doc/libs/1_54_0/doc/html/boost_asio/reference/windows__random_access_handle.html以满足类型要求AsyncReadStream
.
/// @brief Adapts AsyncRandomAccessReadDevice to support AsyncReadStream.
template <typename AsyncRandomAccessReadDevice>
class basic_adapted_stream
: public AsyncRandomAccessReadDevice
{
public:
basic_adapted_stream(
boost::asio::io_service& io_service,
HANDLE handle
)
: AsyncRandomAccessReadDevice(io_service, handle),
offset_(0)
{}
template<typename MutableBufferSequence,
typename ReadHandler>
void async_read_some(
const MutableBufferSequence& buffers,
ReadHandler handler)
{
async_read_at(*this, offset_, buffers,
detail::read_some_offset_handler<ReadHandler>(handler, offset_));
}
private:
boost::uint64_t offset_;
};
或者,boost::asio::windows::basic_stream_handle http://www.boost.org/doc/libs/1_54_0/doc/html/boost_asio/reference/windows__basic_stream_handle.html可以提供满足要求的定制类型流处理服务 http://www.boost.org/doc/libs/1_54_0/doc/html/boost_asio/reference/StreamHandleService.html类型,并实施async_read_some
按照async_read_some_at
.
/// @brief Service that implements async_read_some with async_read_some_at.
class offset_stream_handle_service
: public boost::asio::windows::stream_handle_service
{
private:
// The type of the platform-specific implementation.
typedef boost::asio::detail::win_iocp_handle_service service_impl_type;
public:
/// The unique service identifier.
static boost::asio::io_service::id id;
/// Construct a new stream handle service for the specified io_service.
explicit offset_stream_handle_service(boost::asio::io_service& io_service)
: boost::asio::windows::stream_handle_service(io_service),
service_impl_(io_service),
offset_(0)
{}
/// Start an asynchronous read.
template <typename MutableBufferSequence,
typename ReadHandler>
void
async_read_some(
implementation_type& impl,
const MutableBufferSequence& buffers,
ReadHandler handler)
{
// Implement async_read_some in terms of async_read_some_at. The provided
// ReadHandler will be hoisted in an internal handler so that offset_ can
// be properly updated.
service_impl_.async_read_some_at(impl, offset_, buffers,
detail::read_some_offset_handler<ReadHandler>(handler, offset_));
}
private:
// The platform-specific implementation.
service_impl_type service_impl_;
boost::uint64_t offset_;
};
boost::asio::io_service::id offset_stream_handle_service::id;
我在示例代码中选择了简单性,但多个 I/O 对象将使用相同的服务。就这样offset_stream_handle_service
当多个 I/O 对象使用该服务时,需要管理每个处理程序的偏移量才能正常运行。
要使用适应的类型,请修改AsyncReader::input_handle
成员变量可以是basic_adapted_stream<boost::asio::windows::random_access_handle>
(改编的 I/O 对象)或boost::asio::windows::basic_stream_handle<offset_stream_handle_service>
(调整后的服务)。
Example
这是基于原始代码的完整示例,仅修改了AsyncReader::input_handler
's type:
#include "stdafx.h"
#include <cassert>
#include <iostream>
#include <string>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <Windows.h>
#include <WinBase.h>
namespace detail {
/// @brief Handler to wrap asynchronous read_some_at operations.
template <typename Handler>
class read_some_offset_handler
{
public:
read_some_offset_handler(Handler handler, boost::uint64_t& offset)
: handler_(handler),
offset_(offset)
{}
void operator()(
const boost::system::error_code& error,
std::size_t bytes_transferred)
{
offset_ += bytes_transferred;
// If bytes were transferred, then set the error code as success.
// EOF will be detected on next read. This is to account for
// the read_until algorithm behavior.
const boost::system::error_code result_ec =
(error && bytes_transferred)
? make_error_code(boost::system::errc::success) : error;
handler_(result_ec, bytes_transferred);
}
//private:
Handler handler_;
boost::uint64_t& offset_;
};
/// @brief Hook that allows the wrapped handler to be invoked
/// within specific context. This is critical to support
/// composed operations being invoked within a strand.
template <typename Function,
typename Handler>
void asio_handler_invoke(
Function function,
detail::read_some_offset_handler<Handler>* handler)
{
boost_asio_handler_invoke_helpers::invoke(
function, handler->handler_);
}
} // namespace detail
/// @brief Adapts AsyncRandomAccessReadDevice to support AsyncReadStream.
template <typename AsyncRandomAccessReadDevice>
class basic_adapted_stream
: public AsyncRandomAccessReadDevice
{
public:
basic_adapted_stream(
boost::asio::io_service& io_service,
HANDLE handle
)
: AsyncRandomAccessReadDevice(io_service, handle),
offset_(0)
{}
template<typename MutableBufferSequence,
typename ReadHandler>
void async_read_some(
const MutableBufferSequence& buffers,
ReadHandler handler)
{
async_read_at(*this, offset_, buffers,
detail::read_some_offset_handler<ReadHandler>(handler, offset_));
}
private:
boost::uint64_t offset_;
};
/// @brief Service that implements async_read_some with async_read_some_at.
class offset_stream_handle_service
: public boost::asio::windows::stream_handle_service
{
private:
// The type of the platform-specific implementation.
typedef boost::asio::detail::win_iocp_handle_service service_impl_type;
public:
/// The unique service identifier.
static boost::asio::io_service::id id;
/// Construct a new stream handle service for the specified io_service.
explicit offset_stream_handle_service(boost::asio::io_service& io_service)
: boost::asio::windows::stream_handle_service(io_service),
service_impl_(io_service),
offset_(0)
{}
/// Start an asynchronous read.
template <typename MutableBufferSequence,
typename ReadHandler>
void
async_read_some(
implementation_type& impl,
const MutableBufferSequence& buffers,
ReadHandler handler)
{
// Implement async_read_some in terms of async_read_some_at. The provided
// ReadHandler will be hoisted in an internal handler so that offset_ can
// be properly updated.
service_impl_.async_read_some_at(impl, offset_, buffers,
detail::read_some_offset_handler<ReadHandler>(handler, offset_));
}
private:
// The platform-specific implementation.
service_impl_type service_impl_;
boost::uint64_t offset_;
};
boost::asio::io_service::id offset_stream_handle_service::id;
#ifndef ADAPT_IO_SERVICE
typedef basic_adapted_stream<
boost::asio::windows::random_access_handle> adapted_stream;
#else
typedef boost::asio::windows::basic_stream_handle<
offset_stream_handle_service> adapted_stream;
#endif
class AsyncReader
{
public:
AsyncReader(boost::asio::io_service& io_service, HANDLE handle)
: io_service_(io_service),
input_buffer(/*size*/ 8192),
input_handle(io_service, handle)
{
start_read();
}
void start_read()
{
boost::asio::async_read_until(input_handle, input_buffer, '\n',
boost::bind(&AsyncReader::handle_read, this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}
void handle_read(const boost::system::error_code& error, std::size_t length);
// void handle_write(const boost::system::error_code& error);
private:
boost::asio::io_service& io_service_;
boost::asio::streambuf input_buffer;
adapted_stream input_handle;
};
void AsyncReader::handle_read(const boost::system::error_code& error, std::size_t length)
{
if (!error)
{
static int count = 0;
++count;
// method 1: (same problem)
// const char* pStart = boost::asio::buffer_cast<const char*>(input_buffer.data());
// std::string s(pStart, length);
// input_buffer.consume(length);
// method 2:
std::istream is(&input_buffer);
std::string s;
assert(std::getline(is, s));
std::cout << "line #" << count << ", length " << length << ", getline() [" << s.size() << "] '" << s << "'\n";
start_read();
}
else if (error == boost::asio::error::not_found)
std::cerr << "Did not receive ending character!\n";
else
std::cerr << "Misc error during read!\n";
}
int _tmain(int argc, _TCHAR* argv[])
{
boost::asio::io_service io_service;
HANDLE handle = ::CreateFile(TEXT("c:/temp/input.txt"),
GENERIC_READ,
0, // share mode
NULL, // security attribute: NULL = default
OPEN_EXISTING, // creation disposition
FILE_FLAG_OVERLAPPED,
NULL // template file
);
AsyncReader obj(io_service, handle);
io_service.run();
std::cout << "Normal termination\n";
getchar();
return 0;
}
当使用原始问题的输入时,会产生以下输出:
line #1, length 70, getline() [69] 'LINE 1 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #2, length 70, getline() [69] 'LINE 2 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #3, length 70, getline() [69] 'LINE 3 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #4, length 70, getline() [69] 'LINE 4 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #5, length 70, getline() [69] 'LINE 5 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #6, length 70, getline() [69] 'LINE 6 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #7, length 70, getline() [69] 'LINE 7 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #8, length 70, getline() [69] 'LINE 8 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #9, length 70, getline() [69] 'LINE 9 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #10, length 70, getline() [69] 'LINE 0 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #11, length 70, getline() [69] 'LINE A abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #12, length 70, getline() [69] 'LINE B abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #13, length 70, getline() [69] 'LINE C abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #14, length 70, getline() [69] 'LINE D abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #15, length 70, getline() [69] 'LINE E abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
Misc error during read!
Normal termination
我的输入文件没有\n
F 行末尾的字符。因此,AsyncReader::handle_read()
被调用时出现以下错误boost::asio::error::eof
and input_buffer
的内容包含 LINE F。修改最后的 else 情况后打印更多信息:
...
else
{
std::cerr << "Error: " << error.message() << "\n";
if (std::size_t buffer_size = input_buffer.size())
{
boost::asio::streambuf::const_buffers_type bufs = input_buffer.data();
std::string contents(boost::asio::buffers_begin(bufs),
boost::asio::buffers_begin(bufs) + buffer_size);
std::cerr << "stream contents: '" << contents << "'\n";
}
}
我得到以下输出:
line #1, length 70, getline() [69] 'LINE 1 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #2, length 70, getline() [69] 'LINE 2 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #3, length 70, getline() [69] 'LINE 3 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #4, length 70, getline() [69] 'LINE 4 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #5, length 70, getline() [69] 'LINE 5 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #6, length 70, getline() [69] 'LINE 6 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #7, length 70, getline() [69] 'LINE 7 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #8, length 70, getline() [69] 'LINE 8 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #9, length 70, getline() [69] 'LINE 9 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #10, length 70, getline() [69] 'LINE 0 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #11, length 70, getline() [69] 'LINE A abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #12, length 70, getline() [69] 'LINE B abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #13, length 70, getline() [69] 'LINE C abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #14, length 70, getline() [69] 'LINE D abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #15, length 70, getline() [69] 'LINE E abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
Error: End of file
stream contents: 'LINE F abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
Normal termination