我正在实现一个 haskell 程序,它将文件的每一行与文件中的每一行进行比较。哪些可以实施单线程如下
distance :: Int -> Int -> Int
distance a b = (a-b)*(a-b)
sumOfDistancesOnSmallFile :: FilePath -> IO Int
sumOfDistancesOnSmallFile path = do
fileContents <- readFile path
return $ allDistances $ map read $ lines $ fileContents
where
allDistances (x:xs) = (allDistances xs) + ( sum $ map (distance x) xs)
allDistances _ = 0
这将在 O(n^2) 时间内运行,并且必须始终将完整的整数列表保留在内存中。在我的实际程序中,该行包含更多数字,我从中构造了一个比 Int 稍微复杂的数据类型。这导致我必须处理的数据出现内存不足错误。
所以对上述单线程方案还有两点需要改进。首先,加快实际运行时间。其次,找到一种方法,不要将整个列表始终保留在内存中。我知道这需要解析整个文件 n 次。因此,将进行 O(n^2) 次比较,并解析 O(n^2) 行。这对我来说没问题,因为我宁愿有一个缓慢成功的程序,也不愿有一个失败的程序。当输入文件足够小时,我总是可以使用更简单的版本。
为了使用多个 cpu 核心,我从 Real World Haskell 中取出了 Mapreduce 实现(第 24 章,可用here).
我修改了书中的分块函数,而不是将整个文件分成块,而是返回与行一样多的块,每个块代表一个元素
tails . lines . readFile
因为我希望该程序的文件大小也可扩展,所以我最初使用lazy IO。然而,这因“打开的文件太多”而失败,我在一个问题中询问过这一点上一个问题(文件句柄被 GC 处理得太晚了)。完整的惰性 IO 版本发布在那里。
正如已接受的答案所解释的那样,严格IO可以解决问题。这确实解决了 2k 行文件的“打开文件过多”问题,但在 50k 文件上却因“内存不足”而失败。
请注意,第一个单线程实现(没有mapreduce)能够处理50k的文件。
另一种解决方案对我来说也最有吸引力,那就是使用迭代IO。我希望这能够解决文件句柄和内存资源耗尽问题。然而,我的实现仍然失败,并在 2k 行文件上出现“打开文件过多”错误。
iteratee IO版本有相同的映射减少功能与书中相同,但有所修改分块文件枚举让它与枚举器.
因此我的问题是;以下 iteratee IO 基础实现有什么问题?懒惰在哪里?
import Control.Monad.IO.Class (liftIO)
import Control.Monad.Trans (MonadIO, liftIO)
import System.IO
import qualified Data.Enumerator.List as EL
import qualified Data.Enumerator.Text as ET
import Data.Enumerator hiding (map, filter, head, sequence)
import Data.Text(Text)
import Data.Text.Read
import Data.Maybe
import qualified Data.ByteString.Char8 as Str
import Control.Exception (bracket,finally)
import Control.Monad(forM,liftM)
import Control.Parallel.Strategies
import Control.Parallel
import Control.DeepSeq (NFData)
import Data.Int (Int64)
--Goal: in a file with n values, calculate the sum of all n*(n-1)/2 squared distances
--My operation for one value pair
distance :: Int -> Int -> Int
distance a b = (a-b)*(a-b)
combineDistances :: [Int] -> Int
combineDistances = sum
--Test file generation
createTestFile :: Int -> FilePath -> IO ()
createTestFile n path = writeFile path $ unlines $ map show $ take n $ infiniteList 0 1
where infiniteList :: Int->Int-> [Int]
infiniteList i j = (i + j) : infiniteList j (i+j)
--Applying my operation simply on a file
--(Actually does NOT throw an Out of memory on a file generated by createTestFile 50000)
--But I want to use multiple cores..
sumOfDistancesOnSmallFile :: FilePath -> IO Int
sumOfDistancesOnSmallFile path = do
fileContents <- readFile path
return $ allDistances $ map read $ lines $ fileContents
where
allDistances (x:xs) = (allDistances xs) + ( sum $ map (distance x) xs)
allDistances _ = 0
--Setting up an enumerator of read values from a text stream
readerEnumerator :: Monad m =>Integral a => Reader a -> Step a m b -> Iteratee Text m b
readerEnumerator reader = joinI . (EL.concatMapM transformer)
where transformer input = case reader input of
Right (val, remainder) -> return [val]
Left err -> return [0]
readEnumerator :: Monad m =>Integral a => Step a m b -> Iteratee Text m b
readEnumerator = readerEnumerator (signed decimal)
--The iteratee version of my operation
distancesFirstToTailIt :: Monad m=> Iteratee Int m Int
distancesFirstToTailIt = do
maybeNum <- EL.head
maybe (return 0) distancesOneToManyIt maybeNum
distancesOneToManyIt :: Monad m=> Int -> Iteratee Int m Int
distancesOneToManyIt base = do
maybeNum <- EL.head
maybe (return 0) combineNextDistance maybeNum
where combineNextDistance nextNum = do
rest <- distancesOneToManyIt base
return $ combineDistances [(distance base nextNum),rest]
--The mapreduce algorithm
mapReduce :: Strategy b -- evaluation strategy for mapping
-> (a -> b) -- map function
-> Strategy c -- evaluation strategy for reduction
-> ([b] -> c) -- reduce function
-> [a] -- list to map over
-> c
mapReduce mapStrat mapFunc reduceStrat reduceFunc input =
mapResult `pseq` reduceResult
where mapResult = parMap mapStrat mapFunc input
reduceResult = reduceFunc mapResult `using` reduceStrat
--Applying the iteratee operation using mapreduce
sumOfDistancesOnFileWithIt :: FilePath -> IO Int
sumOfDistancesOnFileWithIt path = chunkedFileEnum chunkByLinesTails (distancesUsingMapReduceIt) path
distancesUsingMapReduceIt :: [Enumerator Text IO Int] -> IO Int
distancesUsingMapReduceIt = mapReduce rpar (runEnumeratorAsMapFunc)
rpar (sumValuesAsReduceFunc)
where runEnumeratorAsMapFunc :: Enumerator Text IO Int -> IO Int
runEnumeratorAsMapFunc = (\source->run_ (source $$ readEnumerator $$ distancesFirstToTailIt))
sumValuesAsReduceFunc :: [IO Int] -> IO Int
sumValuesAsReduceFunc = liftM sum . sequence
--Working with (file)chunk enumerators:
data ChunkSpec = CS{
chunkOffset :: !Int
, chunkLength :: !Int
} deriving (Eq,Show)
chunkedFileEnum :: (NFData (a)) => MonadIO m =>
(FilePath-> IO [ChunkSpec])
-> ([Enumerator Text m b]->IO a)
-> FilePath
-> IO a
chunkedFileEnum chunkCreator funcOnChunks path = do
(chunks, handles)<- chunkedEnum chunkCreator path
r <- funcOnChunks chunks
(rdeepseq r `seq` (return r)) `finally` mapM_ hClose handles
chunkedEnum :: MonadIO m=>
(FilePath -> IO [ChunkSpec])
-> FilePath
-> IO ([Enumerator Text m b], [Handle])
chunkedEnum chunkCreator path = do
chunks <- chunkCreator path
liftM unzip . forM chunks $ \spec -> do
h <- openFile path ReadMode
hSeek h AbsoluteSeek (fromIntegral (chunkOffset spec))
let chunk = ET.enumHandle h --Note:chunklength not taken into account, so just to EOF
return (chunk,h)
-- returns set of chunks representing tails . lines . readFile
chunkByLinesTails :: FilePath -> IO[ChunkSpec]
chunkByLinesTails path = do
bracket (openFile path ReadMode) hClose $ \h-> do
totalSize <- fromIntegral `liftM` hFileSize h
let chunkSize = 1
findChunks offset = do
let newOffset = offset + chunkSize
hSeek h AbsoluteSeek (fromIntegral newOffset)
let findNewline lineSeekOffset = do
eof <- hIsEOF h
if eof
then return [CS offset (totalSize - offset)]
else do
bytes <- Str.hGet h 256
case Str.elemIndex '\n' bytes of
Just n -> do
nextChunks <- findChunks (lineSeekOffset + n + 1)
return (CS offset (totalSize-offset):nextChunks)
Nothing -> findNewline (lineSeekOffset + Str.length bytes)
findNewline newOffset
findChunks 0
顺便说一句,我正在跑步
Mac OS X 10.6.7(雪豹)上的 HaskellPlatform 2011.2.0
包含以下软件包:
字节串0.9.1.10
并行3.1.0.1
枚举器 0.4.8 ,带有手册here