UPDATE
最后添加了奖励版本。现在这个答案提供了代码的三个完整版本:
- Using
managed_shared_memory
按照要求
- Using
message_queue
作为一种更自然的上传/传输方法
- 使用 TCP 套接字 (Asio) 来演示其灵活性
所有这些都仅使用Boost
共享内存管理段包含任意对象。所以你定义一个像这样的对象
struct MyFile {
std::string _filename;
std::vector<char> _contents;
};
并将其存储在那里。但是,等等,不要那么快,因为这些只能安全地存储在进程间分配器,所以添加一些魔法酱(又名很多有趣的 typedef 来声明分配器,以及一些构造函数):
namespace Shared {
using Mem = bip::managed_shared_memory;
using Mgr = Mem::segment_manager;
template <typename T>
using Alloc = bc::scoped_allocator_adaptor<bip::allocator<T, Mgr>>;
template <typename T> using Vector = bc::vector<T, Alloc<T>>;
using String =
bc::basic_string<char, std::char_traits<char>, Alloc<char>>;
struct MyFile {
using allocator_type = Alloc<char>;
template <typename It>
explicit MyFile(std::string_view name, It b, It e, allocator_type alloc)
String _filename;
Vector<char> _contents;
};
}
现在您可以存储文件,例如:
Shared::Mem shm(bip::open_or_create, "shared_mem", 10ull << 30);
std::ifstream ifs("file_name.txt", std::ios::binary);
std::istreambuf_iterator<char> data_begin{ifs}, data_end{};
auto loaded = shm.find_or_construct<Shared::MyFile>("file1")(
file.native(), data_begin, data_end,
shm.get_segment_manager());
请注意,共享内存实际上不会立即占用 30GiB,即使
就是这样10ull << 30
指定。在大多数操作系统上,这将是
稀疏分配,并且只有包含数据的页面才会被提交。
改善
您可能想知道什么是scoped_allocator_adaptor
是为了。我们好像没用过吧?
嗯,我们的想法是not use find_or_construct
直接每个文件,但要
存储一个Vector<MyFile
这样您就可以利用 BIP 分配器的全部功能。
可以调用以下完整demo
- 带有文件名参数,这些参数都将被加载(如果它们存在于
常规文件)
- 不带参数,它将列出以前加载的文件
住在科里鲁
#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/interprocess/managed_mapped_file.hpp> // for COLIRU
#include <boost/interprocess/containers/vector.hpp>
#include <boost/interprocess/containers/string.hpp>
#include <boost/container/scoped_allocator.hpp>
#include <fstream>
#include <filesystem>
#include <iostream>
#include <iomanip>
namespace bip = boost::interprocess;
namespace bc = boost::container;
namespace fs = std::filesystem;
namespace Shared {
#ifdef COLIRU
using Mem = bip::managed_mapped_file; // managed_shared_memory not allows
#else
using Mem = bip::managed_shared_memory;
#endif
using Mgr = Mem::segment_manager;
template <typename T>
using Alloc = bc::scoped_allocator_adaptor<bip::allocator<T, Mgr>>;
template <typename T> using Vector = bc::vector<T, Alloc<T>>;
using String = bc::basic_string<char, std::char_traits<char>, Alloc<char>>;
struct MyFile {
using allocator_type = Alloc<char>;
MyFile(MyFile&&) = default;
MyFile(MyFile const& rhs, allocator_type alloc)
: _filename(rhs._filename.begin(), rhs._filename.end(), alloc),
_contents(rhs._contents.begin(), rhs._contents.end(), alloc) {}
MyFile& operator=(MyFile const& rhs) {
_filename.assign(rhs._filename.begin(), rhs._filename.end());
_contents.assign(rhs._contents.begin(), rhs._contents.end());
return *this;
}
template <typename It>
explicit MyFile(std::string_view name, It b, It e, allocator_type alloc)
: _filename(name.data(), name.size(), alloc),
_contents(b, e, alloc) {}
String _filename;
Vector<char> _contents;
friend std::ostream& operator<<(std::ostream& os, MyFile const& mf) {
return os << "Name: " << std::quoted(mf._filename.c_str())
<< " content size: " << mf._contents.size();
}
};
} // namespace Shared
int main(int argc, char** argv) {
Shared::Mem shm(bip::open_or_create, "shared_mem", 512ull << 10);
using FileList = Shared::Vector<Shared::MyFile>;
auto& shared_files =
*shm.find_or_construct<FileList>("FileList")(shm.get_segment_manager());
if (1==argc) {
std::cout << "Displaying previously loaded files: \n";
for (auto& entry : shared_files)
std::cout << entry << std::endl;
} else {
std::cout << "Loading files: \n";
for (auto file : std::vector<fs::path>{argv + 1, argv + argc}) {
if (is_regular_file(file)) {
try {
std::ifstream ifs(file, std::ios::binary);
std::istreambuf_iterator<char> data_begin{ifs}, data_end{};
auto& loaded = shared_files.emplace_back(
file.native(), data_begin, data_end);
std::cout << loaded << std::endl;
} catch (std::system_error const& se) {
std::cerr << "Error: " << se.code().message() << std::endl;
} catch (std::exception const& se) {
std::cerr << "Other: " << se.what() << std::endl;
}
}
}
}
}
当运行时
g++ -std=c++17 -O2 -Wall -pedantic -pthread main.cpp -lrt -DCOLIRU
./a.out main.cpp a.out
./a.out
Prints
Loading files:
Name: "main.cpp" content size: 3239
Name: "a.out" content size: 175176
Displaying previously loaded files:
Name: "main.cpp" content size: 3239
Name: "a.out" content size: 175176
BONUS
针对评论,我认为值得进行实际比较
消息队列版本
为了进行比较,这里有一个消息队列实现
住在科里鲁
#include <boost/interprocess/ipc/message_queue.hpp>
#include <boost/endian/arithmetic.hpp>
#include <fstream>
#include <filesystem>
#include <iostream>
#include <iomanip>
namespace bip = boost::interprocess;
namespace fs = std::filesystem;
using bip::message_queue;
static constexpr auto MAX_FILENAME_LENGH = 512; // 512 bytes max filename length
static constexpr auto MAX_CONTENT_SIZE = 512ull << 10; // 512 KiB max payload size
struct Message {
std::vector<char> _buffer;
using Uint32 = boost::endian::big_uint32_t;
struct header_t {
Uint32 filename_length;
Uint32 content_size;
};
static_assert(std::is_standard_layout_v<header_t> and
std::is_trivial_v<header_t>);
Message() = default;
Message(fs::path file) {
std::string const name = file.native();
std::ifstream ifs(file, std::ios::binary);
std::istreambuf_iterator<char> data_begin{ifs}, data_end{};
_buffer.resize(header_len + name.length());
std::copy(begin(name), end(name), _buffer.data() + header_len);
_buffer.insert(_buffer.end(), data_begin, data_end);
header().filename_length = name.length();
header().content_size = size() - header_len - name.length();
}
Message(char const* buf, size_t size)
: _buffer(buf, buf+size) {}
static constexpr auto header_len = sizeof(header_t);
static constexpr auto max_size =
header_len + MAX_FILENAME_LENGH + MAX_CONTENT_SIZE;
char const* data() const { return _buffer.data(); }
size_t size() const { return _buffer.size(); }
header_t& header() {
assert(_buffer.size() >= header_len);
return *reinterpret_cast<header_t*>(_buffer.data());
}
header_t const& header() const {
assert(_buffer.size() >= header_len);
return *reinterpret_cast<header_t const*>(_buffer.data());
}
std::string_view filename() const {
assert(_buffer.size() >= header_len + header().filename_length);
return { _buffer.data() + header_len, header().filename_length };
}
std::string_view contents() const {
assert(_buffer.size() >=
header_len + header().filename_length + header().content_size);
return {_buffer.data() + header_len + header().filename_length,
header().content_size};
}
friend std::ostream& operator<<(std::ostream& os, Message const& mf) {
return os << "Name: " << std::quoted(mf.filename())
<< " content size: " << mf.contents().size();
}
};
int main(int argc, char** argv) {
message_queue mq(bip::open_or_create, "file_transport", 10, Message::max_size);
if (1==argc) {
std::cout << "Receiving uploaded files: \n";
char rawbuf [Message::max_size];
while (true) {
size_t n;
unsigned prio;
mq.receive(rawbuf, sizeof(rawbuf), n, prio);
Message decoded(rawbuf, n);
std::cout << "Received: " << decoded << std::endl;
}
} else {
std::cout << "Loading files: \n";
for (auto file : std::vector<fs::path>{argv + 1, argv + argc}) {
if (is_regular_file(file)) {
try {
Message encoded(file);
std::cout << "Sending: " << encoded << std::endl;
mq.send(encoded.data(), encoded.size(), 0);
} catch (std::system_error const& se) {
std::cerr << "Error: " << se.code().message() << std::endl;
} catch (std::exception const& se) {
std::cerr << "Other: " << se.what() << std::endl;
}
}
}
}
}
A demo:
请注意,此方法存在文件大小限制,因为消息具有最大长度
TCP 套接字版本
这是一个 TCP 套接字实现。
住在科里鲁
#include <boost/asio.hpp>
#include <boost/endian/arithmetic.hpp>
#include <vector>
#include <fstream>
#include <filesystem>
#include <iostream>
#include <iomanip>
namespace fs = std::filesystem;
using boost::asio::ip::tcp;
using boost::system::error_code;
static constexpr auto MAX_FILENAME_LENGH = 512; // 512 bytes max filename length
static constexpr auto MAX_CONTENT_SIZE = 512ull << 10; // 512 KiB max payload size
struct Message {
std::vector<char> _buffer;
using Uint32 = boost::endian::big_uint32_t;
struct header_t {
Uint32 filename_length;
Uint32 content_size;
};
static_assert(std::is_standard_layout_v<header_t> and
std::is_trivial_v<header_t>);
Message() = default;
Message(fs::path file) {
std::string const name = file.native();
std::ifstream ifs(file, std::ios::binary);
std::istreambuf_iterator<char> data_begin{ifs}, data_end{};
_buffer.resize(header_len + name.length());
std::copy(begin(name), end(name), _buffer.data() + header_len);
_buffer.insert(_buffer.end(), data_begin, data_end);
header().filename_length = name.length();
header().content_size = actual_size() - header_len - name.length();
}
Message(char const* buf, size_t size)
: _buffer(buf, buf+size) {}
static constexpr auto header_len = sizeof(header_t);
static constexpr auto max_size =
header_len + MAX_FILENAME_LENGH + MAX_CONTENT_SIZE;
char const* data() const { return _buffer.data(); }
size_t actual_size() const { return _buffer.size(); }
size_t decoded_size() const {
return header().filename_length + header().content_size;
}
bool is_complete() const {
return actual_size() >= header_len && actual_size() >= decoded_size();
}
header_t& header() {
assert(actual_size() >= header_len);
return *reinterpret_cast<header_t*>(_buffer.data());
}
header_t const& header() const {
assert(actual_size() >= header_len);
return *reinterpret_cast<header_t const*>(_buffer.data());
}
std::string_view filename() const {
assert(actual_size() >= header_len + header().filename_length);
return std::string_view(_buffer.data() + header_len,
header().filename_length);
}
std::string_view contents() const {
assert(actual_size() >= decoded_size());
return std::string_view(_buffer.data() + header_len +
header().filename_length,
header().content_size);
}
friend std::ostream& operator<<(std::ostream& os, Message const& mf) {
return os << "Name: " << std::quoted(mf.filename())
<< " content size: " << mf.contents().size();
}
};
int main(int argc, char** argv) {
boost::asio::io_context ctx;
u_int16_t port = 8989;
if (1==argc) {
std::cout << "Receiving uploaded files: " << std::endl;
tcp::acceptor acc(ctx, tcp::endpoint{{}, port});
while (true) {
auto s = acc.accept();
std::cout << "Connection accepted from " << s.remote_endpoint() << std::endl;
Message msg;
auto buf = boost::asio::dynamic_buffer(msg._buffer);
error_code ec;
while (auto n = read(s, buf, ec)) {
std::cout << "(read " << n << " bytes, " << ec.message() << ")" << std::endl;
while (msg.is_complete()) {
std::cout << "Received: " << msg << std::endl;
buf.consume(msg.decoded_size() + Message::header_len);
}
}
std::cout << "Connection closed" << std::endl;
}
} else {
std::cout << "Loading files: " << std::endl;
tcp::socket s(ctx);
s.connect(tcp::endpoint{{}, port});
for (auto file : std::vector<fs::path>{argv + 1, argv + argc}) {
if (is_regular_file(file)) {
try {
Message encoded(file);
std::cout << "Sending: " << encoded << std::endl;
write(s, boost::asio::buffer(encoded._buffer));
} catch (std::system_error const& se) {
std::cerr << "Error: " << se.code().message() << std::endl;
} catch (std::exception const& se) {
std::cerr << "Other: " << se.what() << std::endl;
}
}
}
}
}
Demo:
请注意,这如何轻松地扩展到更大的文件、单个连接中的多个文件,甚至如果需要,可以同时扩展到多个连接。它还不进行双缓冲,这提高了性能。
这就是为什么这种方法比任何其他方法更常见的原因。