为什么我使用 iteratee IO 的 Mapreduce 实现(现实世界的 haskell)也会失败并出现“打开文件过多”

2023-11-24

我正在实现一个 haskell 程序,它将文件的每一行与文件中的每一行进行比较。哪些可以实施单线程如下

distance :: Int -> Int -> Int
distance a b = (a-b)*(a-b)

sumOfDistancesOnSmallFile :: FilePath -> IO Int
sumOfDistancesOnSmallFile path = do
              fileContents <- readFile path
              return $ allDistances $ map read $ lines $ fileContents
              where
                  allDistances (x:xs) = (allDistances xs) + ( sum $ map (distance x) xs)
                  allDistances _ = 0

这将在 O(n^2) 时间内运行,并且必须始终将完整的整数列表保留在内存中。在我的实际程序中,该行包含更多数字,我从中构造了一个比 Int 稍微复杂的数据类型。这导致我必须处理的数据出现内存不足错误。

所以对上述单线程方案还有两点需要改进。首先,加快实际运行时间。其次,找到一种方法,不要将整个列表始终保留在内存中。我知道这需要解析整个文件 n 次。因此,将进行 O(n^2) 次比较,并解析 O(n^2) 行。这对我来说没问题,因为我宁愿有一个缓慢成功的程序,也不愿有一个失败的程序。当输入文件足够小时,我总是可以使用更简单的版本。

为了使用多个 cpu 核心,我从 Real World Haskell 中取出了 Mapreduce 实现(第 24 章,可用here).

我修改了书中的分块函数,而不是将整个文件分成块,而是返回与行一样多的块,每个块代表一个元素

tails . lines . readFile

因为我希望该程序的文件大小也可扩展,所以我最初使用lazy IO。然而,这因“打开的文件太多”而失败,我在一个问题中询问过这一点上一个问题(文件句柄被 GC 处理得太晚了)。完整的惰性 IO 版本发布在那里。

正如已接受的答案所解释的那样,严格IO可以解决问题。这确实解决了 2k 行文件的“打开文件过多”问题,但在 50k 文件上却因“内存不足”而失败。

请注意,第一个单线程实现(没有mapreduce)能够处理50k的文件。

另一种解决方案对我来说也最有吸引力,那就是使用迭代IO。我希望这能够解决文件句柄和内存资源耗尽问题。然而,我的实现仍然失败,并在 2k 行文件上出现“打开文件过多”错误。

iteratee IO版本有相同的映射减少功能与书中相同,但有所修改分块文件枚举让它与枚举器.

因此我的问题是;以下 iteratee IO 基础实现有什么问题?懒惰在哪里?

import Control.Monad.IO.Class (liftIO)
import Control.Monad.Trans (MonadIO, liftIO)
import System.IO

import qualified Data.Enumerator.List as EL
import qualified Data.Enumerator.Text as ET
import Data.Enumerator hiding (map, filter, head, sequence)

import Data.Text(Text)
import Data.Text.Read
import Data.Maybe

import qualified Data.ByteString.Char8 as Str
import Control.Exception (bracket,finally)
import Control.Monad(forM,liftM)
import Control.Parallel.Strategies
import Control.Parallel
import Control.DeepSeq (NFData)
import Data.Int (Int64)

--Goal: in a file with n values, calculate the sum of all n*(n-1)/2 squared distances

--My operation for one value pair
distance :: Int -> Int -> Int
distance a b = (a-b)*(a-b)

combineDistances :: [Int] -> Int
combineDistances = sum

--Test file generation
createTestFile :: Int -> FilePath -> IO ()
createTestFile n path = writeFile path $ unlines $ map show $ take n $ infiniteList 0 1
        where infiniteList :: Int->Int-> [Int]
              infiniteList i j = (i + j) : infiniteList j (i+j)

--Applying my operation simply on a file 
--(Actually does NOT throw an Out of memory on a file generated by createTestFile 50000)
--But I want to use multiple cores..
sumOfDistancesOnSmallFile :: FilePath -> IO Int
sumOfDistancesOnSmallFile path = do
                  fileContents <- readFile path
                  return $ allDistances $ map read $ lines $ fileContents
                  where
                      allDistances (x:xs) = (allDistances xs) + ( sum $ map (distance x)    xs)
                      allDistances _ = 0

--Setting up an enumerator of read values from a text stream
readerEnumerator :: Monad m =>Integral a => Reader a -> Step a m b -> Iteratee Text m b
readerEnumerator reader = joinI . (EL.concatMapM transformer)
                            where transformer input = case reader input of
                                         Right (val, remainder) -> return [val]
                                         Left err -> return [0]

readEnumerator :: Monad m =>Integral a => Step a m b -> Iteratee Text m b
readEnumerator = readerEnumerator (signed decimal)

--The iteratee version of my operation
distancesFirstToTailIt :: Monad m=> Iteratee Int m Int
distancesFirstToTailIt = do
    maybeNum <- EL.head
    maybe (return 0) distancesOneToManyIt maybeNum

distancesOneToManyIt :: Monad m=> Int -> Iteratee Int m Int
distancesOneToManyIt base = do
    maybeNum <- EL.head
    maybe (return 0) combineNextDistance maybeNum
    where combineNextDistance nextNum = do
              rest <- distancesOneToManyIt base
              return $ combineDistances [(distance base nextNum),rest]

--The mapreduce algorithm
mapReduce :: Strategy b -- evaluation strategy for mapping
          -> (a -> b)   -- map function
          -> Strategy c -- evaluation strategy for reduction
          -> ([b] -> c) -- reduce function
          -> [a]        -- list to map over
          -> c
mapReduce mapStrat mapFunc reduceStrat reduceFunc input =
          mapResult `pseq` reduceResult
          where mapResult    = parMap mapStrat mapFunc input
                reduceResult = reduceFunc mapResult `using` reduceStrat

--Applying the iteratee operation using mapreduce
sumOfDistancesOnFileWithIt :: FilePath -> IO Int
sumOfDistancesOnFileWithIt path = chunkedFileEnum chunkByLinesTails (distancesUsingMapReduceIt) path

distancesUsingMapReduceIt :: [Enumerator Text IO Int] -> IO Int
distancesUsingMapReduceIt = mapReduce rpar (runEnumeratorAsMapFunc)
                                      rpar (sumValuesAsReduceFunc)
                            where runEnumeratorAsMapFunc :: Enumerator Text IO Int -> IO Int
                                  runEnumeratorAsMapFunc = (\source->run_ (source $$ readEnumerator $$ distancesFirstToTailIt))
                                  sumValuesAsReduceFunc :: [IO Int] -> IO Int
                                  sumValuesAsReduceFunc = liftM sum . sequence
                              

--Working with (file)chunk enumerators:
data ChunkSpec = CS{
    chunkOffset :: !Int
    , chunkLength :: !Int
    } deriving (Eq,Show)

chunkedFileEnum ::   (NFData (a)) => MonadIO m =>
                (FilePath-> IO [ChunkSpec])
           ->   ([Enumerator Text m b]->IO a)
           ->   FilePath
           ->   IO a
chunkedFileEnum chunkCreator funcOnChunks path = do
    (chunks, handles)<- chunkedEnum chunkCreator path
    r <- funcOnChunks chunks
    (rdeepseq r `seq` (return r)) `finally` mapM_ hClose handles

chunkedEnum ::  MonadIO m=>
                (FilePath -> IO [ChunkSpec])
            ->  FilePath
            ->  IO ([Enumerator Text m b], [Handle])
chunkedEnum chunkCreator path = do
    chunks <- chunkCreator path
    liftM unzip . forM chunks $ \spec -> do
        h <- openFile path ReadMode
        hSeek h AbsoluteSeek (fromIntegral (chunkOffset spec))
        let chunk = ET.enumHandle h --Note:chunklength not taken into account, so just to EOF
        return (chunk,h)

-- returns set of chunks representing  tails . lines . readFile 
chunkByLinesTails :: FilePath -> IO[ChunkSpec]
chunkByLinesTails path = do
    bracket (openFile path ReadMode) hClose $ \h-> do
        totalSize <- fromIntegral `liftM` hFileSize h
        let chunkSize = 1
            findChunks offset = do
            let newOffset = offset + chunkSize
            hSeek h AbsoluteSeek (fromIntegral newOffset)
            let findNewline lineSeekOffset = do
                eof <- hIsEOF h
                if eof
                    then return [CS offset (totalSize - offset)]
                    else do
                        bytes <- Str.hGet h 256
                        case Str.elemIndex '\n' bytes of
                            Just n -> do
                                nextChunks <- findChunks (lineSeekOffset + n + 1)
                                return (CS offset (totalSize-offset):nextChunks)
                            Nothing -> findNewline (lineSeekOffset + Str.length bytes)
            findNewline newOffset
        findChunks 0

顺便说一句,我正在跑步 Mac OS X 10.6.7(雪豹)上的 HaskellPlatform 2011.2.0
包含以下软件包:
字节串0.9.1.10
并行3.1.0.1
枚举器 0.4.8 ,带有手册here


正如错误所示,打开的文件太多。我预计 Haskell 会按顺序运行大部分程序,但有些“火花”会并行。然而,正如 sclv 提到的,Haskell 总是引发评估。

这在纯函数式程序中通常不是问题,但在处理 IO(资源)时却是问题。我将 Real World Haskell 书中描述的并行性扩展得太远了。所以我的结论是,在处理 Spark 内的 IO 资源时,只在有限的范围内进行并行。在纯函数部分,过度的并行可能会成功。

因此,我的帖子的答案是,不要在整个程序上使用 MapReduce,而是在内部纯功能部分中使用。

为了显示程序实际失败的位置,我使用 --enable-executable-profiling -p 配置它,构建它,并使用 +RTS -p -hc -L30 运行它。由于可执行文件立即失败,因此没有内存分配配置文件。 .prof 文件中生成的时间分配配置文件以以下内容开头:

                                                                                               individual    inherited
COST CENTRE              MODULE                                               no.    entries  %time %alloc   %time %alloc

MAIN                     MAIN                                                   1            0   0.0    0.3   100.0  100.0
  main                    Main                                                1648           2   0.0    0.0    50.0   98.9
    sumOfDistancesOnFileWithIt MapReduceTest                                  1649           1   0.0    0.0    50.0   98.9
      chunkedFileEnum       MapReduceTest                                     1650           1   0.0    0.0    50.0   98.9
        chunkedEnum          MapReduceTest                                    1651         495   0.0   24.2    50.0   98.9
          lineOffsets         MapReduceTest                                   1652           1  50.0   74.6    50.0   74.6

chunkedEnum 返回 IO ([Enumerator Text m b], [Handle]),显然收到 495 个条目。输入文件是一个 2k 行文件,因此 lineOffsets 上的单个条目返回 2000 个偏移量的列表。 distancesUsingMapReduceIt 中没有一个条目,所以实际工作根本就没有开始!

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

为什么我使用 iteratee IO 的 Mapreduce 实现(现实世界的 haskell)也会失败并出现“打开文件过多” 的相关文章

  • Haskell:需要了解 Functor 的签名

    有人能给我解释一下 Functor 的签名吗 Prelude gt info Functor class Functor f gt where fmap a gt b gt f a gt f b lt a gt f b gt f a 我不明
  • 如何在haskell中用另一个字符串替换一个字符串

    我想用不同的字符串替换输入文件中的字符串 我正在寻找一种方法 但似乎我只能逐个字符地更改字符串 例如在我下面的代码中 replace String gt String replace replace x xs if x then y rep
  • 使用带有两个列表而不是一个列表的地图。可以筑巢吗?

    我需要多次运行一个带有两个参数的函数 我有两个包含这些参数的列表 我希望能够使用map或类似的东西用相应的参数调用函数 我要调用的函数具有以下类型 runParseTest String gt String gt IO 列表的创建方式如下
  • Django中的自动递增值

    我在 django 中有一个表并尝试自动递增它的序列号 在自定义模板中 for 循环用于变量 自定义模板 for i in getodeskview tr td 1 td td i odesk id td td i hours td td
  • 如何处理数字逻辑模拟器中的循环?

    我正在开发一个数字逻辑模拟器 以便稍后在其中构建我自己的 CPU 所以这是一个长期项目 对于没有循环的电路 例如全加器 一切都非常有效 还有像 SR 锁存器这样的电路 其中一个门的输入连接到另一个门的输出 所以我陷入了循环 因为两个门都需要
  • 在 Haskell 中将字符串转换为整数/浮点数?

    data GroceryItem CartItem ItemName Price Quantity StockItem ItemName Price Quantity makeGroceryItem String gt Float gt I
  • 根据同一 select 语句中先前计算的行(或列)计算新行(或列)

    我正在尝试根据年度销售增长预期来计算年度预期销售量 在一张表中 我有实际销量 create table Sales ProductId int Year int GrowthRate float insert into Sales valu
  • 管道 - 将多个来源/生产者合并为一个

    我正在使用读取文件sourceFile 但我还需要在处理操作中引入随机性 我认为最好的方法是拥有一个这样的制片人 Producer m StdGen ByteString 其中 StdGen 用于生成随机数 我打算让生产者执行 source
  • 你将如何在 Haskell 中(重新)实现迭代?

    iterate a gt a gt a gt a 你可能知道 iterate是一个接受函数和起始值的函数 然后它将函数应用于起始值 然后将相同的函数应用于最后的结果 依此类推 Prelude gt take 5 iterate 2 2 2
  • 浏览多个字段的值并将它们插入到同一列中

    我正在尝试使用重复行为我的 oracle apex 应用程序创建一个功能 假设我有一个车辆表 CREATE TABLE vehicles brand VARCHAR2 50 model VARCHAR2 50 comment VARCHAR
  • 如何从具有函数依赖关系的类型类中获取和使用依赖类型?

    如何从具有函数依赖关系的类型类中获取和使用依赖类型 为了澄清并给出我最近的尝试的一个例子 从我正在编写的实际代码中最小化 class Identifiable a b a gt b where if you know a you know
  • Python 比编译的 Haskell 更快?

    我有一个用 Python 和 Haskell 编写的简单脚本 它读取包含 1 000 000 个换行符分隔的整数的文件 将该文件解析为整数列表 对其进行快速排序 然后将其写入已排序的不同文件中 该文件与未排序的文件具有相同的格式 简单的 这
  • do { ... } while (0) — 它有什么用? [复制]

    这个问题在这里已经有答案了 我已经看到这个表情十多年了 我一直在努力思考它有什么好处 因为我主要在 defines 中看到它 所以我认为它对于内部作用域变量声明和使用中断 而不是 gotos 很有用 对其他方面有好处吗 你用它吗 这是 C
  • 并行 Haskell - GHC GC 火花

    我有一个正在尝试并行化的程序 带有可运行代码的完整粘贴here http lpaste net 101528 我进行了分析 发现大部分时间都花在findNearest这本质上是一个简单的foldr超过一个大Data Map findNear
  • 为什么 count 比 $count 差

    我只是在查看不同问题的答案以了解更多信息 我看到一个answer https stackoverflow com a 4891402 429850这表明在 php 中编写这样的做法是不好的做法 for i 0 i
  • 优化 Haskell 内循环

    仍在 Haskell 中进行 SHA1 实现 我现在已经有了一个有效的实现 这是内部循环 iterateBlock Int gt Word32 gt Word32 gt Word32 gt Word32 gt Word32 gt Word3
  • 如何比较 JavaScript 表格中的单元格并测试是否相等? indexOf 是如何工作的?

    我在 HTML 代码中创建了一个表格 它有 9 列和 13 行 它被 JavaScript 循环完全填满 该循环用几个数组中的人名填充它 但是 我想添加一个验证步骤 确保一行中没有两个单元格具有相同的值 并且每个单元格的值不会在其正下方的单
  • Haskell Cabal:“包间接依赖于同一包的多个版本”

    清除我的所有后cabal installed 包 我运行了以下会话 cabal update Downloading the latest package list from hackage haskell org james bast c
  • 如何与更高级别的类型合作

    玩弄教堂的数字 我遇到了无法指导 GHC 类型检查器处理高阶类型的情况 首先我写了一个版本 没有任何类型签名 module ChurchStripped where zero z z inc n z s s n z s natInteger
  • for 循环与 cor.test 在许多类别上

    我正在尝试在 R 中编写一个循环 它将循环遍历 3 个不同的物种 以计算两个连续变量 Redness 和 VarNormAbund 之间的相关性 我的循环正在运行 但 3 个物种中每一个的输出都是相同的 这让我认为循环卡在第一个物种上 co

随机推荐