我有一个包含与用户关联的数据的流程。我还为每个用户提供了一个状态,我可以从数据库异步获取该状态。
我想将我的流与每个用户一个子流分开,并在具体化子流时加载每个用户的状态,以便可以根据该状态来处理子流的元素。
如果我不想合并下游的子流,我可以做一些事情groupBy
and Sink.lazyInit
:
def getState(userId: UserId): Future[UserState] = ...
def getUserId(element: Element): UserId = ...
def treatUser(state: UserState): Sink[Element, _] = ...
val treatByUser: Sink[Element] = Flow[Element].groupBy(
Int.MaxValue,
getUserId
).to(
Sink.lazyInit(
elt => getState(getUserId(elt)).map(treatUser),
??? // this is never called, since the subflow is created when an element comes
)
)
但是,如果treatUser
成为一个Flow
,因为没有等价的Sink.lazyInit
.
由于子流groupBy
仅当推送新元素时才会具体化,应该可以使用此元素来具体化子流,但我无法调整 groupBy 的源代码以使这项工作一致。同样地,Sink.lazyInit
似乎不容易翻译成Flow
case.
关于如何解决这个问题有什么想法吗?
您必须查看的相关 Akka 问题是#20129:添加 Sink.dynamic 和 Flow.dynamic https://github.com/akka/akka/issues/20129.
在相关的公关中#20579 https://github.com/akka/akka/pull/20579他们实际上实施了LazySink
stuffs.
他们正计划做LazyFlow
next:
将使用类似的签名执行下一个lazyFlow。
不幸的是,您必须等待该功能在 Akka 中实现或自己编写(然后考虑向 Akka 提交 PR)。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)