Akka-Streams 收集数据(Source -> Flow -> Flow (collect) -> Sink)

2024-05-15

我对 Scala 和 Akka 完全陌生。我有一个简单的 RunnableFlow:

Source -> Flow (do some transformation) -> Sink.runForeach

现在我想要这样的东西:

Source -> Flow1 (do some transformation) -> Flow2 (do some transformation) -> Sink.runForeach

但是 Flow2 应该等到 Flow1 中的 100 个元素可用,然后将这 100 个元素转换为一个新元素(需要 Flow1 中的所有 100 个元素)并将这个新元素提供给 Sink。

我做了一些研究发现显式用户定义的缓冲区 http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M2/scala/stream-rate.html但我不明白如何访问 flow2 中 flow1 的所有 100 个元素并用它们进行一些转换。有人可以解释一下吗?或者更好地发布一个简单的小例子?或两者?


Akka 定义集合

如果你不介意使用 akka 确定的集合类型,那么你可以使用grouped函数代替:

//alternative stream formation
val stream = Source(1 to 100).via(Flow[Int].grouped(bufferSize))
                             .runWith(Sink foreach println)

用户定义的集合

如果您想控制用于缓冲区的集合类型,例如ASeq or Array:

type MyCollectionType[X] = Array[X]

def emptyMyCollection[X] : MyCollectionType[X] = Array.empty[X]

然后您可以使用两个流程执行此操作。第一个 Flow 执行scan构建元素序列:

val bufferSize = 10

def appendToMyCollection[X](coll : MyCollectionType[X], i : X) : MyCollectionType[X] = 
  (if(coll.size < bufferSize) coll else emptyMyCollection[Int]) :+ i

val buffer : Flow[Int, MyCollectionType[Int], _] = 
  Flow[Int].scan[MyCollectionType[Int]](emptyMyCollection[Int]){
    (coll, i) => appendToMyCollection(coll, i)
  }

第二个流程是filter对于大小合适的序列(即“goldiLocks”):

val goldiLocks : Flow[MyCollectionType[Int], MyCollectionType[Int],_] =
  Flow[MyCollectionType[Int]].filter(_.size == bufferSize)

这两个 Flow 可以组合起来生成一个 Stream,它将生成所需的集合类型:

val stream = Source(1 to 100).via(buffer)
                             .via(goldiLocks)
                             .runWith(Sink foreach println)
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Akka-Streams 收集数据(Source -> Flow -> Flow (collect) -> Sink) 的相关文章

随机推荐

  • 迭代 xml 元素的有效方法

    我有一个像这样的xml a b hello b b world b a
  • python 2.7 中的 HTTP 2 请求

    在 python 中向 HTTP 1 和 HTTP 2 发出请求有什么区别吗 我可以像这样在 python 中进行 HTTP 1 x 调用 url http someURL values param1 key param2 key2 dat
  • nHibernate + wcf + Isession

    我有一个包含 3 个项目的 C 解决方案 数据 WCF 和 UI 第一个是与数据库对话的类库 它通过第二个项目公开 该项目的类型为 WCF 服务库 原因是它将在第三个项目 称为 UI 的 Asp net 应用程序 作为指向 dll 的简单
  • 如何在Python中绘制数据立方体

    我想知道是否有一种方法可以在 Python 中绘制数据立方体 我的意思是每个点都有三个坐标 x part points 0 y part points 1 z part points 2 对于每个点我都有一个标量场 t x y z 我想绘制
  • 在 Rails 的文本字段中显示链接

    在我的 Rails 3 1 应用程序中 我有一个用于评论的文本字段 我希望能够允许人们包含可点击的链接 而不仅仅是显示为纯文本的 url 以及让文本字段识别用户何时文本字段中的换行符 用户无需添加 html 我怎样才能做到这一点 如果用户将
  • 包含目录中的所有文件?

    如何实现以下代码想要实现的目标 include dir In Bash HEADER all headers h echo ifndef ALL HEADERS gt HEADER echo define ALL HEADERS gt gt
  • 在 Toad 中调试 PLSQL

    我一直在使用 PL SQL 为 Oracle db 创建包 并且我正在尝试找到一种在不使用 put line 命令的情况下调试 PL SQL 包的好方法 是否有人对如何成功调试Toad 或 SQLPlus 上的 PL SQL 包 根据 TO
  • 如何在向量中的所有点之间绘制线?

    我有一个包含二维空间中一些点的向量 我希望 MATLAB 用从每个点到每个其他点绘制的线来绘制这些点 基本上 我想要一个所有顶点都连接的图 你能用情节来做到这一点吗 如果可以 怎么做 一种解决方案是使用该函数为每个点组合创建一组索引MESH
  • 如何使用 Excel Interop 获取筛选行的范围?

    我正在为我的项目使用 Excel Interop 程序集 如果我想使用自动过滤器 那么可以使用 sheet UsedRange AutoFilter 1 SheetNames 1 Microsoft Office Interop Excel
  • 在java中轮询Http服务器(重复发送http get请求)

    当对其进行 REST 调用时 我的 Web 服务器会发送一些信息 我想不断轮询该服务器 间隔5秒后重复发送HTTP GET请求 以检查返回的信息是否有任何变化 做到这一点最有效的方法是什么 您能提供一些代码示例吗 请注意 我只想开发客户端代
  • 如何使用 Dialogflow 回复用户的姓名?

    我想知道如何回复用户的姓名 例如 AppName 你好 请问你的名字吗 User 我的名字是约翰 AppName 欢迎约翰来到 appname 我想在没有网络钩子或实现的情况下执行此操作 我只想做对话流 如果您想在其他意图中使用它并以此上下
  • 按下按钮并在java中的新窗口中打开文件

    我创建了一个 JFrame 并放置了一个文本字段和按钮 在文本字段中我放置了从文本文件读取的名称 我知道我想单击按钮并打开一个已知窗口 我想在其中放置名称 其他信息来自同一个文件 这是我的代码 这是我的主框架 package Fronten
  • jupyter 中的 r 图形 - 无法启动 png() 设备

    我在 Jupyter 中使用 R 但无法在笔记本本身中绘制图表 这是一个可重现的示例 set seed 123 mat as matrix x rnorm 100 y rnorm 100 plot mat 在朱皮特中 Error in pn
  • C# 中 LINQ 中的按多列分组

    我有一个类如下 public class ActualClass public string BookName get set public string IssuerName get set public DateTime DateOfI
  • 使用 Visual Studio 2013 构建 Qt 5.2.1 的静态版本

    几天来我一直在尝试使用 Visual Studio 2013 构建 Qt 的静态版本 我就是不明白我做错了什么 System Windows 7 64 位 Visual Studio 2013 仍安装 Visual Studio 2012
  • 7 张牌扑克手牌评估器

    有谁知道评估 7 张牌扑克牌的快速算法吗 这比简单地暴力检查 7 张牌中每 21 个 5 张牌的组合更有效 Cheers Pete 我写了一篇JavaScript 核心评估方法仅使用位操作 因此速度非常快 考虑到这一点 查看 21 种组合还
  • 如何使用 JQuery DataTables 根据每个单元格中值的子字符串对列进行排序

    假设我有一列包含格式为 P 的对象标识符 例如 P12 3767 我使用的是 1 9 1 版本的 JQuery数据表插件 http datatables net用于排序和分页 有没有办法可以忽略单元格值的前 4 个字符 P12 部分 以便我
  • Apache 反向代理的基本身份验证问题

    我想为在 Ubuntu 服务器 12 04 1 上运行的 Apache 反向代理站点添加基本身份验证 网络应用程序是Jenkins http jenkins ci org运行在 Java EE 容器上 我在中添加了以下配置httpd con
  • 制作一个包含自定义对象列表的可分割类

    我在使列表对象可解析时遇到错误 我认为读取对象时发生错误 这是我的代码 public class TestSample implements Parcelable int intValue String stirngValue privat
  • Akka-Streams 收集数据(Source -> Flow -> Flow (collect) -> Sink)

    我对 Scala 和 Akka 完全陌生 我有一个简单的 RunnableFlow Source gt Flow do some transformation gt Sink runForeach 现在我想要这样的东西 Source gt