Since StreamingBody
实施Stream<Item = Vec<u8>, Error = Error>
,我们可以构建一个MCVE https://stackoverflow.com/help/mcve这代表着:
extern crate futures; // 0.1.25
use futures::{prelude::*, stream};
type Error = Box<std::error::Error>;
fn streaming_body() -> impl Stream<Item = Vec<u8>, Error = Error> {
const DUMMY_DATA: &[&[u8]] = &[b"0123", b"4567", b"89AB", b"CDEF"];
let iter_of_owned_bytes = DUMMY_DATA.iter().map(|&b| b.to_owned());
stream::iter_ok(iter_of_owned_bytes)
}
然后我们可以以某种方式获得“流媒体主体”并使用Stream::for_each https://docs.rs/futures/0.1.25/futures/stream/trait.Stream.html#method.for_each处理中的每个元素Stream
。在这里,我们只需调用write_all
提供一些输出位置:
use std::{fs::File, io::Write};
fn save_to_disk(mut file: impl Write) -> impl Future<Item = (), Error = Error> {
streaming_body().for_each(move |chunk| file.write_all(&chunk).map_err(Into::into))
}
然后我们可以编写一些测试主要内容:
fn main() {
let mut file = Vec::new();
{
let fut = save_to_disk(&mut file);
fut.wait().expect("Could not drive future");
}
assert_eq!(file, b"0123456789ABCDEF");
}
关于这个简单实现的质量的重要注意事项:
致电给write_all
可能会阻塞,您不应该在异步程序中这样做。最好将阻塞工作交给线程池。
的用法Future::wait
强制线程阻塞,直到未来完成,这对于测试来说非常有用,但对于您的实际用例可能不正确。
也可以看看:
- 在 future-rs 中封装阻塞 I/O 的最佳方法是什么? https://stackoverflow.com/q/41932137/155423
- 如何在稳定的 Rust 中同步返回在异步 Future 中计算的值? https://stackoverflow.com/q/52521201/155423