我正在尝试构建一个Conduit
接收作为输入ByteString
s(每个块大小约为 1kb)并作为输出连接生成ByteString
512kb 块。
这看起来应该很简单,但是我遇到了很多麻烦,我尝试使用的大多数策略都只能成功地将块分成更小的块,我还没有成功地连接更大的块。
我开始尝试isolate
, then takeExactlyE
最终conduitVector
,但无济于事。最终我决定这样做:
import qualified Data.Conduit as C
import qualified Data.Conduit.Combinators as C
import qualified Data.ByteString as B
import qualified Data.ByteString.Lazy as BL
chunksOfAtLeast :: Monad m => Int -> C.Conduit B.ByteString m BL.ByteString
chunksOfAtLeast chunkSize = loop BL.empty chunkSize
where
loop buffer n = do
mchunk <- C.await
case mchunk of
Nothing ->
-- Yield last remaining bytes
when (n < chunkSize) (C.yield buffer)
Just chunk -> do
-- Yield when the buffer has been filled and start over
let buffer' = buffer <> BL.fromStrict chunk
l = B.length chunk
if n <= l
then C.yield buffer' >> loop BL.empty chunkSize
else loop buffer' (n - l)
P.S. I decided not to split larger chunks for this function, but this was just a convenient simplification.
However, this seems very verbose given all the conduit functions that deal with chunking[1 https://www.fpcomplete.com/user/snoyberg/library-documentation/vectorbuilder,2 http://hackage.haskell.org/package/conduit-combinators-0.2.8.2/docs/Data-Conduit-Combinators.html,3 http://hackage.haskell.org/package/conduit-extra-1.1.3.2/docs/Data-Conduit-Binary.html,4 http://hackage.haskell.org/package/conduit-extra-1.1.3.2/docs/Data-Conduit-Blaze.html]. Please help! There must surely be a better way to do this using combinators, but I am missing some piece of intuition!
P.P.S. Is it ok to use lazy bytestring for the buffer as I've done? I'm a bit unclear about the internal representation for bytestring and whether this will help, especially since I'm using BL.length
which I guess might evaluate the thunk anyway?
结论
只是为了详细说明迈克尔的回答和评论,我最终得到了这个管道:
import qualified Data.Conduit as C
import qualified Data.Conduit.Combinators as C
import qualified Data.ByteString as B
import qualified Data.ByteString.Lazy as BL
-- | "Strict" rechunk of a chunked conduit
chunksOfE' :: (MonadBase base m, PrimMonad base)
=> Int
-> C.Conduit ByteString m ByteString
chunksOfE' chunkSize = C.vectorBuilder chunkSize C.mapM_E =$= C.map fromByteVector
我的理解是vectorBuilder
将支付早期连接较小块的成本,将聚合块生成为严格的字节串。
据我所知,有一种替代实现可以产生惰性字节串块(即“大块大块”当聚合块非常大和/或馈入自然流接口(如网络套接字)时,()可能是理想的。这是我对“惰性字节串”版本的最佳尝试:
import qualified Data.Sequences.Lazy as SL
import qualified Data.Sequences as S
import qualified Data.Conduit.List as CL
-- | "Lazy" rechunk of a chunked conduit
chunksOfE :: (Monad m, SL.LazySequence lazy strict)
=> S.Index lazy
-> C.Conduit strict m lazy
chunksOfE chunkSize = CL.sequence C.sinkLazy =$= C.takeE chunkSize