使用 Scalaz 7 zipWithIndex/group enumeratees 避免内存泄漏

2024-02-10

背景

如中所述这个问题 https://stackoverflow.com/questions/19059831/asynchronous-iteratee-processing-in-scalaz,我使用 Scalaz 7 iteratees 来处理常量堆空间中的大型(即无界)数据流。

我的代码如下所示:

type ErrorOrT[M[+_], A] = EitherT[M, Throwable, A]
type ErrorOr[A] = ErrorOrT[IO, A]

def processChunk(c: Chunk, idx: Long): Result

def process(data: EnumeratorT[Chunk, ErrorOr]): IterateeT[Vector[(Chunk, Long)], ErrorOr, Vector[Result]] =
  Iteratee.fold[Vector[(Chunk, Long)], ErrorOr, Vector[Result]](Nil) { (rs, vs) =>
    rs ++ vs map { 
      case (c, i) => processChunk(c, i) 
    }
  } &= (data.zipWithIndex mapE Iteratee.group(P))

问题

我似乎遇到了内存泄漏,但我对 Scalaz/FP 不够熟悉,无法知道该错误是在 Scalaz 中还是在我的代码中。直观上,我希望这段代码只需要(按顺序)PChunk- 大小的空间。

注:我发现类似的问题 https://stackoverflow.com/questions/16228154/scalaz-7-iteratee-to-process-large-zip-file-outofmemoryerror其中一个OutOfMemoryError遇到了,但我的代码没有使用consume.

Testing

我进行了一些测试来尝试找出问题所在。总而言之,只有当两者都存在时才会出现泄漏zipWithIndex and group被使用。

// no zipping/grouping
scala> (i1 &= enumArrs(1 << 25, 128)).run.unsafePerformIO
res47: Long = 4294967296

// grouping only
scala> (i2 &= (enumArrs(1 << 25, 128) mapE Iteratee.group(4))).run.unsafePerformIO
res49: Long = 4294967296

// zipping and grouping
scala> (i3 &= (enumArrs(1 << 25, 128).zipWithIndex mapE Iteratee.group(4))).run.unsafePerformIO
java.lang.OutOfMemoryError: Java heap space

// zipping only
scala> (i4 &= (enumArrs(1 << 25, 128).zipWithIndex)).run.unsafePerformIO
res51: Long = 4294967296

// no zipping/grouping, larger arrays
scala> (i1 &= enumArrs(1 << 27, 128)).run.unsafePerformIO
res53: Long = 17179869184

// zipping only, larger arrays
scala> (i4 &= (enumArrs(1 << 27, 128).zipWithIndex)).run.unsafePerformIO
res54: Long = 17179869184

测试代码:

import scalaz.iteratee._, scalaz.effect.IO, scalaz.std.vector._

// define an enumerator that produces a stream of new, zero-filled arrays
def enumArrs(sz: Int, n: Int) = 
  Iteratee.enumIterator[Array[Int], IO](
    Iterator.continually(Array.fill(sz)(0)).take(n))

// define an iteratee that consumes a stream of arrays 
// and computes its length
val i1 = Iteratee.fold[Array[Int], IO, Long](0) { 
  (c, a) => c + a.length 
}

// define an iteratee that consumes a grouped stream of arrays 
// and computes its length
val i2 = Iteratee.fold[Vector[Array[Int]], IO, Long](0) { 
  (c, as) => c + as.map(_.length).sum 
}

// define an iteratee that consumes a grouped/zipped stream of arrays
// and computes its length
val i3 = Iteratee.fold[Vector[(Array[Int], Long)], IO, Long](0) {
  (c, vs) => c + vs.map(_._1.length).sum
}

// define an iteratee that consumes a zipped stream of arrays
// and computes its length
val i4 = Iteratee.fold[(Array[Int], Long), IO, Long](0) {
  (c, v) => c + v._1.length
}

问题

  • 我的代码中有错误吗?
  • 我怎样才能在恒定的堆空间中完成这项工作?

对于那些与旧的人纠缠不休的人来说,这并不能带来什么安慰。iterateeAPI,但我最近验证了等效测试通过了scalaz-stream API https://github.com/scalaz/scalaz-stream。这是一个较新的流处理 API,旨在取代iteratee.

为了完整起见,这里是测试代码:

// create a stream containing `n` arrays with `sz` Ints in each one
def streamArrs(sz: Int, n: Int): Process[Task, Array[Int]] =
  (Process emit Array.fill(sz)(0)).repeat take n

(streamArrs(1 << 25, 1 << 14).zipWithIndex 
      pipe process1.chunk(4) 
      pipe process1.fold(0L) {
    (c, vs) => c + vs.map(_._1.length.toLong).sum
  }).runLast.run

这应该适用于任何值n参数(前提是您愿意等待足够长的时间)——我使用 2^14 32MiB 数组进行了测试(即,随着时间的推移总共分配了 0.5 TiB 的内存)。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

使用 Scalaz 7 zipWithIndex/group enumeratees 避免内存泄漏 的相关文章

随机推荐

  • 激活 wp 插件时无法创建多个自定义数据库表

    我无法在激活我的 WordPress 插件时在数据库中创建多个自定义表 它仅创建最后一个表 如此代码中提到的 书签 而不是创建所有表 用户 字段 可见性 通知 等 function your plugin options install g
  • 通过多线程重用空手道功能时出错

    以下代码适用于单线程 def validateAricle file features Articles feature validateArticle def articles id 12 call read validateArticl
  • 警报返回多次

    我正在学习 JS 并试图解决编码挑战 当我输入参数时 我应该有一个警报告诉用户发电机总量和总瓦数 问题是读码器说我正在提醒不止一个 我在做什么使警报被多次调用 这是我的第一次尝试 function changePowerTotal tota
  • 无法解析属性“CosmosDBAttribute.ConnectionStringSetting”的值

    我使用 Visual Studio 和 CosmosDB 输出开发了简单的服务总线触发器 Service Bus 和 CosmosDB 的连接字符串在 local settings json 中定义 代码在本地功能齐全 现在我已经将 Zip
  • close 是否会抛出 IOException?

    在此处提供了一些答案并阅读了一些评论之后 似乎在实践中 文件 I O 的关闭时永远不会引发 IOException 是否存在对 Stream Reader Writer 调用 close 实际上抛出 IOException 的情况 如果确实
  • Subversion 合并需要“旧风格”,即使一切看起来都是最新的?

    我最近从旧的 subversion 服务器 存储库迁移到最新版本 1 8 9 新存储库是在新服务器上从头开始创建的 旧数据是从头开始导入的 我们从旧存储库中签出代码 将其导出到本地以删除所有 SVN 绑定 然后将其重新签入新存储库中 一切看
  • 带内边距的边框不能为 0 宽度

    考虑这段代码 div box sizing border box width 0 height 0 max width 0 max height 0 border 1px solid gray padding 12px overflow h
  • 什么是序列(数据库)?我们什么时候需要它?

    即使有主键 为什么我们还要创建一个序列 主键是表中的一列 主键需要一个唯一的值 该值需要来自某个地方 序列是某些数据库产品的一个功能 它只是创建唯一的值 它只是增加一个值并返回它 它的特别之处在于 没有事务隔离 因此多个事务不能获得相同的值
  • Docker 构建错误“无法获取索引基 URL http://pypi.python.org/simple/”

    我正在构建一个 dockerfiledocker build 命令 在构建时 我遇到以下错误 Downloading unpacking requests Cannot fetch index base URL http pypi pyth
  • 如何在模拟方法中对函数进行单元测试

    我如何在这里测试DeleteAppointmentById Func
  • Javascript:解析 document.cookie JSON 对象时出现问题

    在服务器上 我将 JSON 对象存储为 cookie 使用 Django json dumps 它看起来像这样 name Simon gender M 在客户端上 当我运行 document cookie 时 我可以看到 cookie 它看
  • 如何将脚本仅应用于 Google 电子表格中的一张工作表

    我有一个谷歌电子表格 其中有两张纸 分别称为 Robin 和 Lucy 我制作 找到 修改了一个脚本 用于在每次向 A 列添加一些数据时对工作表上的数据进行排序 function onEdit event var sheet event s
  • 使用不带类型的 FileHelpers

    我有一个从另一个系统导出的 CSV 文件 其中列顺序和定义可能会更改 我发现 FileHelpers 非常适合读取 csv 文件 但除非您在编译应用程序之前知道列的顺序 否则您似乎无法使用它 我想知道是否可以以非类型化方式使用 FileHe
  • SQL查询和日期时间参数需要很长时间才能执行

    我有一个以日期时间作为参数的查询 我们观察到 如果您通过变量提供日期时间参数 则查询执行时间比直接硬编码参数要多 2 3 倍 有什么原因或解决方案到它 以下查询大约需要 5 分钟才能返回结果 Declare Date as DateTime
  • 使用 Wordpress JSON API 注册/登录用户

    我想为一个 WordPress 网站创建移动应用程序 我已经集成了WordPress json 插件 http wordpress org extend plugins json api 我不确定在哪里可以找到用户注册和登录的服务 请指教
  • ruby中如何产生延迟

    ruby中如何产生延迟 我使用了 sleep 语句 但它没有给我我想要的 puts amit sleep 10 puts scj 我希望它首先打印 amit 然后延迟 10 秒 然后打印 scj 但在上面的情况下 它会暂停 10 秒 然后一
  • fancybox 2:将缩略图放在父 div 中

    好的 我最近添加了 fancyBox http www fancyapps com 到我的网站 它很棒 不过 我想将缩略图 其功能是缩略图助手的一部分 因此位于单独的 js 文件中 放在图库图像下方 我首先尝试更改包含图像的 div 我假设
  • 是否可以重命名列?

    是否可以发出类似的东西 RENAME COLUMN col1 col2 在 Google Cloud Spanner 中 从 DDL 来看这是不可能的 如果不是 这是 Beta 版的设计选择还是限制 不 这是不可能的 目前 您只能对表中的列
  • 即使线程池中只有一个线程,也会发生并发吗?

    我正在使用 Rails 5 和 Ruby 2 4 我怎样才能弄清楚 或者你可以通过查看下面的内容来判断是否有多个线程同时运行 pool Concurrent FixedThreadPool new 1 promises links map
  • 使用 Scalaz 7 zipWithIndex/group enumeratees 避免内存泄漏

    背景 如中所述这个问题 https stackoverflow com questions 19059831 asynchronous iteratee processing in scalaz 我使用 Scalaz 7 iteratees