groupBy 的子流可以依赖于它们生成的键吗?

2024-02-21

我有一个包含与用户关联的数据的流程。我还为每个用户提供了一个状态,我可以从数据库异步获取该状态。

我想将我的流与每个用户一个子流分开,并在具体化子流时加载每个用户的状态,以便可以根据该状态来处理子流的元素。

如果我不想合并下游的子流,我可以做一些事情groupBy and Sink.lazyInit :

def getState(userId: UserId): Future[UserState] = ...
def getUserId(element: Element): UserId = ...
def treatUser(state: UserState): Sink[Element, _] = ...

val treatByUser: Sink[Element] = Flow[Element].groupBy(
  Int.MaxValue, 
  getUserId
).to(
  Sink.lazyInit(
    elt => getState(getUserId(elt)).map(treatUser),
    ??? // this is never called, since the subflow is created when an element comes
  )
)

但是,如果treatUser成为一个Flow,因为没有等价的Sink.lazyInit.

由于子流groupBy仅当推送新元素时才会具体化,应该可以使用此元素来具体化子流,但我无法调整 groupBy 的源代码以使这项工作一致。同样地,Sink.lazyInit似乎不容易翻译成Flow case.

关于如何解决这个问题有什么想法吗?


您必须查看的相关 Akka 问题是#20129:添加 Sink.dynamic 和 Flow.dynamic https://github.com/akka/akka/issues/20129.

在相关的公关中#20579 https://github.com/akka/akka/pull/20579他们实际上实施了LazySink stuffs.

他们正计划做LazyFlow next:

将使用类似的签名执行下一个lazyFlow。

不幸的是,您必须等待该功能在 Akka 中实现或自己编写(然后考虑向 Akka 提交 PR)。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

groupBy 的子流可以依赖于它们生成的键吗? 的相关文章

  • 从apache Spark中的文本文件查找rdd中存储的数据大小

    我是 Apache Spark 版本 1 4 1 的新手 我编写了一段小代码来读取文本文件并将其数据存储在 Rdd 中 有没有一种方法可以获取 rdd 中数据的大小 这是我的代码 import org apache spark SparkC
  • 无法证明与路径相关类型的等价性

    为什么最后一个summon编译失败 我该怎么做才能让它编译 import java time LocalDateTime LocalTime trait Circular T type Parent given localTimeCircu
  • Scala 如何使用我的所有核心?

    object PrefixScan sealed abstract class Tree A case class Leaf A a A extends Tree A case class Node A l Tree A r Tree A
  • 强制类型差异

    在 Scala 中 我可以在编译时强制执行类型相等 例如 case class Foo A B a A b B implicit ev A B scala gt Foo 1 2 res3 Foo Int Int Foo 1 2 scala
  • 如何将 csv 文件读取为键值对的映射

    我的 csv 文件中有数据 例如 value key A Name B Name C Name 24 Age 25 Age 20 Age M Gender F Gender 我想解析它以生成以下地图 Map Name gt List A B
  • 对 HList 进行协变过滤

    我打算以协变方式过滤 HList 我也想包含子类 所以协变滤波器Foo应捕获以下元素Foo也Bar 我已经构建了这个例子来尝试 lt lt 看看它是否做了我想做的事情 http scastie org 6465 http scastie o
  • Scala 中的超时未来

    假设我有一个函数 它调用一个阻塞可中断的手术 我想在超时的情况下异步运行它 也就是说 我想在超时到期时中断该功能 所以我正在尝试做这样的事情 import scala util Try import scala concurrent Fut
  • 使用 Scala Slick 创建组合主键

    我正在尝试使用两列作为 Scala Slick 表的主键 这是我的表的定义方式 class NbaPlayerBoxScoreTable tag Tag extends Table NbaPlayerBoxScore tag player
  • Scala+Slick 3:将一个查询的结果插入到另一张表中

    这个问题是关于 slick 3 0 或 3 1 的 我对此很灵活 我有一个中间查询 我用它来处理map for等等以获得我想要的结果 最后我有一个 val foo DBIOAction Seq MySchema Bar NoStream E
  • 向数据框添加新列的问题 - Spark/scala

    我是 Spark scala 的新手 我正在尝试将一些数据从配置单元表读取到 Spark 数据帧 然后根据某些条件添加一列 这是我的代码 val DF hiveContext sql select from select from test
  • Spark SQL中如何按列降序排序?

    I tried df orderBy col1 show 10 但它是按升序排列的 df sort col1 show 10 也按升序排序 我查看了 stackoverflow 发现的答案都已过时或称为 RDD https stackove
  • 帮助我理解这段 Scala 代码:scalaz IO Monad 和隐式

    这是后续this https stackoverflow com questions 7404495 help me understand this scala code scalaz io monad问题 这是我试图理解的代码 它来自ht
  • scala.concurrent.blocking - 它实际上做了什么?

    我花了一段时间学习 Scala 执行上下文 底层线程模型和并发性的主题 你能解释一下通过什么方式吗scala concurrent blocking 调整运行时行为 and 可以提高性能或避免死锁 如中所述scaladoc http www
  • Scala - lambda 参数可以匹配元组吗?

    所以说我有一些清单 比如 val l List 1 blue 5 red 2 green 然后我想过滤掉其中一个 我可以做类似的事情 val m l filter item gt val n s item unpack the tuple
  • Scala:需要类类型,但找到了 T

    我发现了与此特定问题类似的问题 但是该问题是由于有人试图直接实例化 T 造成的 在这里 我试图创建一个特征 它是一个通用接口来扩展类并将它们自动存储在数据库中 例如 Riak 使用classOf T 使用 Scala 2 10 这是我的代码
  • akka http配置中的idle-timeout和request timeout有什么区别?

    我查阅了文档并发现了这些 空闲连接自动关闭的时间 设置infinite完全禁用空闲连接超时 空闲超时 10 秒 Defines the default time period within which the application has
  • 使用无形类型不等式时如何自定义 Scala 模糊隐式错误

    def typeSafeSum T lt Nat W lt Nat R lt Nat x T y W implicit sum Sum Aux T W R error R 7 x typeSafeSum 3 4 compilation er
  • Scala 模式匹配打印漂亮

    是否有可能以某种方式编组部分函数 假设它总是只包含一种情况 进入某物人类可读的 假设我们有 Any 类型的集合 消息 List Any 以及使用模式匹配块定义的 PartialFuntion Any T 的数量 case object R1
  • Scala REPL / SBT Console 是否有配置文件?

    我一直在尝试找到某种点文件来放入 Scala REPL 设置和自定义函数 我特别有兴趣传递它的标志 例如 Dscala color 启用语法突出显示 以及覆盖设置 如结果字符串截断 scala gt power scala gt vals
  • akka-http:找不到参数解组的隐式值

    我的 Spray json 支持看起来像这样 object MarshallingSupport extends SprayJsonSupport implicit def json4sFormats Formats DefaultForm

随机推荐

  • 如何让textview闪烁

    伙计们 我有一个文本视图 我需要它闪烁 请帮助我
  • 如何使用 Mootools 更改背景图像 css 属性?

    我是新手 如何使用 Mootools 更改我的背景图像 css 属性 例如 在页面加载时 div div
  • Espresso 测试,单击 X/Y 坐标

    知道如何为 Android 做到这一点吗 我似乎无法创建一个真正点击的方法 我知道你可以用 onview 做到这一点 但我只想要一个 x y 位置 答案已经给出here https stackoverflow com a 22798043
  • FireFox 中的 Selenium OpenQA.Selenium.DriverServiceNotFoundException

    我正在尝试开始编写 Selenium 测试 我编写的第一个非常基本的测试失败了 但有例外OpenQA Selenium DriverServiceNotFoundException using OpenQA Selenium using O
  • 获取月份为 01,02 而不是 1,2

    我正在使用 Calendar 类 更具体地说 我需要以两个数字的形式返回所有 12 个月 如果我使用以下代码 int month myCalendar get Calendar MONTH 1 这是我在不同月份得到的结果 1 2 3 4 5
  • Linux 内核 2.6.18 中的 sys_call_table

    我试图通过以下方式将系统退出调用设置为变量 extern void sys call table real sys exit sys call table NR exit 但是 当我尝试制作时 控制台给出了错误 error NR exit
  • Groovy 的尾递归

    我编写了 3 个阶乘算法 我预计会因堆栈溢出而失败 没问题 我尝试尾递归调用 并将以前的算法从递归转换为迭代 它不起作用 但我不明白为什么 I use trampoline 方法 效果如我所料 def factorial factorial
  • “fork()”后 printf 异常

    操作系统 Linux 语言 纯C 我正在继续学习一般的 C 编程 以及特殊情况下 UNIX 下的 C 编程 我发现了一个奇怪的 对我来说 行为printf 使用后的功能fork call Code include
  • 如果 pandas 中包含子字符串,则替换整个字符串

    我想替换包含特定子字符串的所有字符串 例如 如果我有这个数据框 import pandas as pd df pd DataFrame name Bob Jane Alice sport tennis football basketball
  • 使用 MVVM 将窗口句柄传递给 WPF 中的视图模型

    我正在使用需要窗口句柄的外部库 我的应用程序架构是 MVVM 但外部库并不完全适合该架构 我认为视图模型是调用需要窗口句柄的初始化函数的最合适的地方 如何将窗口句柄从我的视图获取到我的视图模型 通常 您的视图模型不应该了解视图的实现细节 例
  • jquery定时器实现

    All 是否有一个 jQuery 计时器可以启动 20 分钟的计时器并显示经过的时间 请指出它的一个小代码 var austDay new getTime austDay new getSeconds austDay var duratio
  • 使用 GCC 的软件流水线示例

    我正在寻找软件管道的真实 源代码和生成代码 示例 http en wikipedia org wiki Software pipelined http en wikipedia org wiki Software pipelining 由海
  • Excel - 获取列的前 5 个数据及其匹配的标题,但会产生重复项

    我正在开发一个使用 PHP 以 CodeIgniter 作为框架 制作的 Web 应用程序 它应该生成一个 excel 文件报告作为数据摘要 我使用 PHPSpreadsheet 作为库来生成 xlsx 文件 一切都很成功 我能够在单元格的
  • 即使作业成功完成后,内存使用量也不会降低

    我在 apscheduler 中添加了一项作业 该作业会在内存中加载一些数据 并在作业完成后删除所有对象 现在 如果我使用 python 运行此作业 它会成功运行 并且在进程成功退出后内存会下降 但是在 apscheduler 的情况下 内
  • 为什么 React Router v6 似乎无法从 URL 中删除查询字符串参数?

    我有一个应用程序 有时会加载查询字符串参数t 一开始 我希望应用程序读取此参数 如果可用 并将其从 URL 中删除 在根组件上 我正在这样做 const searchParams setSearchParams useSearchParam
  • 使用 Rails 在 Net::HTTP::Get.new 中设置自定义超时

    我正在使用此代码来抓取外部 html 文件 link URI parse url request Net HTTP Get new link path response Net HTTP start link host link port
  • iPhone:无法让模拟器生成 .gcda 分析数据文件

    我正在尝试使用 iPhone 模拟器分析我的代码 我已启用生成测试覆盖率文件 and 仪器程序流程并添加了 lgcov到链接器标志 根据我读过的所有内容 这应该是我在设置方面需要做的全部事情 Update 生成测试覆盖率文件触发 f测试覆盖
  • 在 iOS 上录制、修改和播放音频

    EDIT 最后 我完全按照下面的解释 使用 AVRecorder 来录制语音 使用 openAL 来进行音调转换和播放 效果很好 我有一个关于录制 修改和播放音频的问题 我之前也问过类似的问题 在 iOS 上实时录制 修改音高和播放音频 h
  • NPAPI 插件未在 Chrome 上加载

    我有一个由 dll 和 manifest json 文件组成的 npapi 插件 此 npapi dll 被检测为 chrome 上的插件 即它列在 about plugins 页面上 但是 当我使用标签在示例 html 页面中调用此插件时
  • groupBy 的子流可以依赖于它们生成的键吗?

    我有一个包含与用户关联的数据的流程 我还为每个用户提供了一个状态 我可以从数据库异步获取该状态 我想将我的流与每个用户一个子流分开 并在具体化子流时加载每个用户的状态 以便可以根据该状态来处理子流的元素 如果我不想合并下游的子流 我可以做一