如何在 fs2 中“拆分”流?

2024-04-20

我想做这样的事情:

def splitStream[F, A](stream: fs2.Stream[F, A], split: A => B): (Stream[F, A], Stream[F, B)) = 
  (stream, stream.map(split)

但这不起作用,因为它从源“拉”两次 - 当我耗尽两者时每次一次stream and stream.map(split)。我该如何防止这种情况?通过维护自己的内部缓冲区以某种方式切换到基于“推”的模型,这样我就不会拉两次?


通过维护自己的内部缓冲区以某种方式切换到基于“推”的模型,这样我就不会拉两次?

是的。例如,您可以使用 fs2 中的队列:

def splitStream[F[_], A](stream: Stream[F, A], split: A => B): F[(Stream[F, A], Stream[F, B])] = 
  for {
    q <- Queue.noneTerminated[F, A]
  } yield (stream.evalTap(a => q.enqueue1(Some(a)).onFinalize(q.enqueue1(None)), q.dequeue.map(split))

当然,这里的问题是,如果调用者忽略任一流,则另一个流将死锁并且永远不会发出任何内容。这通常是您在尝试将一个流分成多个流时遇到的问题,并且无论何时订阅,每个子流中都会保证出现一个值。

我通常寻求的解决方案是组合更大的操作并使用像broadcast or parJoin:

def splitAndRun[F[_]: Concurrent, A](
  base: Stream[F, A],
  runSeveralThings: List[Stream[F, A] => Stream[F, Unit]]
): F[Unit] =
  base.broadcastTo(run: _*).compile.drain

在这里,您知道将有多少个消费者,因此首先不会有被忽略的流。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何在 fs2 中“拆分”流? 的相关文章

随机推荐