我有一个函数可以生成futures::Stream
基于一个论点。我想多次调用这个函数并将流压平在一起。使问题变得复杂的是,我想将流返回的值作为参数提供给原始函数。
具体来说,我有一个函数可以将数字流返回到零:
fn numbers_down_to_zero(v: i32) -> impl Stream<Item = i32> {
stream::iter((0..v).rev())
}
我想从 5 开始调用这个函数。还应该为每个返回的奇数调用该函数。总的调用集numbers_down_to_zero
将会:
numbers_down_to_zero(5);
numbers_down_to_zero(3);
numbers_down_to_zero(1);
numbers_down_to_zero(1);
产生总流量
4
3
2
1
0
2
1
0
0
0
有哪些技术可以实现这一点?
你可以用以下方法解决这个问题unfold
。您将有一个“状态”结构,它保留“基本流”(在本例中倒数为零)和将生成新流的项目列表,并将其用作参数unfold
展开时保持状态。
这样编译器就不必推理生命周期所有权,因为状态可以移动到async
每次调用闭包时都会阻塞。
/// Base stream (counting down to zero).
fn f(n: i32) -> impl Stream<Item = i32> {
stream::iter((0..n).rev())
}
/// "Recursive" stream
fn g(n: i32) -> impl Stream<Item = i32> {
/// Helper struct to keep state while unfolding
struct StreamState<S> {
inner_stream: S,
item_queue: VecDeque<i32>,
}
// Build helper struct
let state = StreamState {
inner_stream: f(n),
item_queue: VecDeque::new(),
};
// Unfold with state
stream::unfold(state, |mut state| async move {
loop {
if let Some(item) = state.inner_stream.next().await {
// Iterate inner stream, and potentially push item to queue
if item % 2 == 1 {
state.item_queue.push_front(item);
}
break Some((item, state));
} else if let Some(item) = state.item_queue.pop_back() {
// If inner stream is exhausted, produce new stream from queue
// and repeat loop
state.inner_stream = f(item);
} else {
// If queue is empty, we are done
break None;
}
}
})
}
StreamExt::next https://docs.rs/futures/0.3.1/futures/stream/trait.StreamExt.html#method.next要求内部流实现Unpin
,因此它不能用于任意流。您可以随时使用Box::pin(stream)
相反,因为Pin<Box<T>>
is Unpin
并实施Stream
if T: Stream
.
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)