在处理流时如何删除或忽略错误?

2024-01-12

我有一个很长的期货清单,我想使用它们来运行Stream::buffer_unordered/Stream::buffered。我将这个流结合成一个未来for_each然后用 Tokio 执行这一切。其中一个期货返回错误是很常见的。根据文档,for_each当返回错误时将停止。

当返回这些错误时,如何忽略或只打印一条消息并继续执行后续的 future?

这是与我的情况类似的一般代码:

use futures::stream;
use futures::stream::Stream;
use futures::future::err;
use futures::future::ok;
use tokio;

fn main() {
    let queries: Vec<u32> = (0..10).collect();
    let futures = queries.into_iter().map(move |num| {
        println!("Started {}", num);
        // Maybe throw error
        let future = match num % 3 {
            0 => ok::<u32, u32>(num),
            _ => err::<u32, u32>(num)
        };
        future
    });

    let stream = stream::iter_ok(futures);
    let num_workers = 8;
    let future = stream
        .buffer_unordered(num_workers)
        .map_err(|err| {
            println!("Error on {:?}", err);
        })
        .for_each(|n| {
            println!("Success on {:?}", n);
            Ok(())
        });

    tokio::runtime::run(future);
}

如果您尝试这个示例,当出现以下情况时,Future 队列将提前停止执行:Err被抛出。


  • Stream::map_err https://docs.rs/futures/0.1.25/futures/stream/trait.Stream.html#method.map_err— 提供了错误值,它可以转换类型,但会将其保留为错误。

  • Stream::or_else https://docs.rs/futures/0.1.25/futures/stream/trait.Stream.html#method.or_else— 提供错误值,它可以将错误转换为成功,而成功值保持不变。

  • Stream::then https://docs.rs/futures/0.1.25/futures/stream/trait.Stream.html#method.then— 提供成功和错误值,可以做任何你想做的事情。

Stream::map无法让您将错误转化为成功,因此它没有用。Stream::or_else does提供该能力,但当您可以将错误类型转换为成功类型时使用它。仅有的Stream::then使您能够同时转换两种类型。

Stream::flatten https://docs.rs/futures/0.1.25/futures/stream/trait.Stream.html#method.flatten可用于将流的流转换为单个流。

结合这一事实Result可以被视为一个迭代器,你可以创建这个:

stream
    .then(|r| future::ok(stream::iter_ok::<_, ()>(r)))
    .flatten()

无论流的项目是否Ok or Err,我们将其转换为迭代器并从中创建一个流。然后我们压平溪流。

如果你想打印出错误,我会使用Stream::inspect_err https://docs.rs/futures/0.1/futures/stream/trait.Stream.html#method.inspect_err:

stream.inspect_err(|err| println!("Error on {:?}", err))

完整代码:

use futures::{
    future,
    stream::{self, Stream},
}; // 0.1.25;
use tokio; // 0.1.14

fn main() {
    let stream = stream::iter_ok({
        (0..10).map(|num| {
            println!("Started {}", num);
            match num % 3 {
                0 => future::ok(num),
                _ => future::err(num),
            }
        })
    })
    .buffer_unordered(2);

    let stream = stream
        .inspect_err(|err| println!("Error on {:?}", err))
        .then(|r| future::ok(stream::iter_ok::<_, ()>(r)))
        .flatten();

    tokio::run({
        stream.for_each(|n| {
            println!("Success on {:?}", n);
            Ok(())
        })
    });
}
Started 0
Started 1
Success on 0
Started 2
Error on 1
Started 3
Error on 2
Started 4
Success on 3
Started 5
Error on 4
Started 6
Error on 5
Started 7
Success on 6
Started 8
Error on 7
Started 9
Error on 8
Success on 9
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

在处理流时如何删除或忽略错误? 的相关文章

随机推荐