Stream::map_err https://docs.rs/futures/0.1.25/futures/stream/trait.Stream.html#method.map_err— 提供了错误值,它可以转换类型,但会将其保留为错误。
Stream::or_else https://docs.rs/futures/0.1.25/futures/stream/trait.Stream.html#method.or_else— 提供错误值,它可以将错误转换为成功,而成功值保持不变。
Stream::then https://docs.rs/futures/0.1.25/futures/stream/trait.Stream.html#method.then— 提供成功和错误值,可以做任何你想做的事情。
Stream::map
无法让您将错误转化为成功,因此它没有用。Stream::or_else
does提供该能力,但当您可以将错误类型转换为成功类型时使用它。仅有的Stream::then
使您能够同时转换两种类型。
Stream::flatten https://docs.rs/futures/0.1.25/futures/stream/trait.Stream.html#method.flatten可用于将流的流转换为单个流。
结合这一事实Result
可以被视为一个迭代器,你可以创建这个:
stream
.then(|r| future::ok(stream::iter_ok::<_, ()>(r)))
.flatten()
无论流的项目是否Ok
or Err
,我们将其转换为迭代器并从中创建一个流。然后我们压平溪流。
如果你想打印出错误,我会使用Stream::inspect_err https://docs.rs/futures/0.1/futures/stream/trait.Stream.html#method.inspect_err:
stream.inspect_err(|err| println!("Error on {:?}", err))
完整代码:
use futures::{
future,
stream::{self, Stream},
}; // 0.1.25;
use tokio; // 0.1.14
fn main() {
let stream = stream::iter_ok({
(0..10).map(|num| {
println!("Started {}", num);
match num % 3 {
0 => future::ok(num),
_ => future::err(num),
}
})
})
.buffer_unordered(2);
let stream = stream
.inspect_err(|err| println!("Error on {:?}", err))
.then(|r| future::ok(stream::iter_ok::<_, ()>(r)))
.flatten();
tokio::run({
stream.for_each(|n| {
println!("Success on {:?}", n);
Ok(())
})
});
}
Started 0
Started 1
Success on 0
Started 2
Error on 1
Started 3
Error on 2
Started 4
Success on 3
Started 5
Error on 4
Started 6
Error on 5
Started 7
Success on 6
Started 8
Error on 7
Started 9
Error on 8
Success on 9