我有一个消耗无限数据流的程序。在此过程中,我想记录一些指标,这些指标形成一个幺半群,因为它们只是简单的求和和平均值。我想定期在某处写下这些指标,清除它们,然后返回累积它们。我基本上有:
object Foo {
type MetricsIO[A] = StateT[IO, MetricData, A]
def recordMetric(m: MetricData): MetricsIO[Unit] = {
StateT.modify(_.combine(m))
}
def sendMetrics: MetricsIO[Unit] = {
StateT.modifyF { s =>
val write: IO[Unit] = writeMetrics(s)
write.attempt.map {
case Left(_) => s
case Right(_) => Monoid[MetricData].empty
}
}
}
}
所以大部分执行使用IO
直接使用并升降机StateT.liftF
。在某些情况下,我会添加一些调用recordMetric
。最后我得到了一个流:
val mainStream: Stream[MetricsIO, Bar] = ...
我想定期(比如每分钟左右)转储指标,所以我尝试了:
val scheduler: Scheduler = ...
val sendStream =
scheduler
.awakeEvery[MetricsIO](FiniteDuration(1, TimeUnit.Minutes))
.evalMap(_ => Foo.sendMetrics)
val result = mainStream.concurrently(sendStream).compile.drain
然后我做通常的顶级程序调用run
与开始状态然后调用unsafeRunSync
.
问题是,我只看到空的指标!我怀疑这与我的幺半群隐式提供空指标有关sendStream
但我不太明白为什么会这样或如何解决它。也许有一种方法可以“交错”这些sendMetrics
而是调用主流?
编辑:这是一个最小的完整可运行示例:
import fs2._
import cats.implicits._
import cats.data._
import cats.effect._
import java.util.concurrent.Executors
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
val sec = Executors.newScheduledThreadPool(4)
implicit val ec = ExecutionContext.fromExecutorService(sec)
type F[A] = StateT[IO, List[String], A]
val slowInts = Stream.unfoldEval[F, Int, Int](1) { n =>
StateT(state => IO {
Thread.sleep(500)
val message = s"hello $n"
val newState = message :: state
val result = Some((n, n + 1))
(newState, result)
})
}
val ticks = Scheduler.fromScheduledExecutorService(sec).fixedDelay[F](FiniteDuration(1, SECONDS))
val slowIntsPeriodicallyClearedState = slowInts.either(ticks).evalMap[Int] {
case Left(n) => StateT.liftF(IO(n))
case Right(_) => StateT(state => IO {
println(state)
(List.empty, -1)
})
}
现在如果我这样做:
slowInts.take(10).compile.drain.run(List.empty).unsafeRunSync
然后我得到了预期的结果 - 状态正确地累积到输出中。但如果我这样做:
slowIntsPeriodicallyClearedState.take(10).compile.drain.run(List.empty).unsafeRunSync
然后我看到一个空列表始终打印出来。我希望打印出部分列表(大约 2 个元素)。