我们有一些东西在 TChan 上倾销值,然后由消费者处理这些值。但消费者无法跟上,因此当生产者在通道上倾倒大量内容时,我们会使用大量内存,但消费者却无法跟上。如果通道队列达到一定大小或某种程度,是否有一种直接的方法让生产者阻塞,以便我们可以让生产者等待消费者赶上?
就像约翰的回答一样,我建议自己构建一个有界的 TChan。我的代码有所不同,因为它:
- 添加抽象(使得
BTChan
an ADT)
- 由于读取 IO 中的当前大小,消除了极端情况。
- 读取时尽量不要在 TVar 大小中构建 thunk(写入时不太重要,因为 thunk 只能是“一层深”——下一个操作总是需要评估大小)。
- 现在正在破解:http://hackage.haskell.org/package/bounded-tchan http://hackage.haskell.org/package/bounded-tchan
注意:老实说,如果我是你,我会忽略所有这些答案,只使用他的评论中链接的代码(除非它是糟糕的代码)。我敢打赌它的作用与我在这里所做的相同,但经过更多思考。
{-# LANGUAGE BangPatterns #-}
module BTChan
( BTChan
, newBTChanIO
, newBTChan
, writeBTChan
, readBTChan
) where
import Control.Concurrent.STM
data BTChan a = BTChan {-# UNPACK #-} !Int (TChan a) (TVar Int)
-- | `newBTChan m` make a new bounded TChan of max size `m`
newBTChanIO :: Int -> IO (BTChan a)
newBTChanIO m = do
szTV <- newTVarIO 0
c <- newTChanIO
return (BTChan m c szTV)
newBTChan :: Int -> STM (BTChan a)
newBTChan m
| m < 1 = error "BTChan's can not have a maximum <= 0!"
| otherwise = do
szTV <- newTVar 0
c <- newTChan
return (BTChan m c szTV)
writeBTChan :: BTChan a -> a -> STM ()
writeBTChan (BTChan mx c szTV) x = do
sz <- readTVar szTV
if sz >= mx then retry else writeTVar szTV (sz + 1) >> writeTChan c x
readBTChan :: BTChan a -> STM a
readBTChan (BTChan _ c szTV) = do
x <- readTChan c
sz <- readTVar szTV
let !sz' = sz - 1
writeTVar szTV sz'
return x
sizeOfBTChan :: BTChan a -> STM Int
sizeOfBTChan (BTChan _ _ sTV) = readTVar sTV
STM 程序员需要注意的一些事项:
- 显式调用
retry
将产生,将你的 haskell 线程置于阻塞状态,等待其中之一的状态TVar
or TChan
进行更改以便可以重试。这是避免检查值的方法IO
并使用yield
功能。
- 与 MVar 一样,TVar 可以引用 thunk,这通常不是您想要的。也许有人应该制作一个 hackage 包来定义
STVar
, STChan
, SBTChan
and BTChan
(严格和/或有界的 TVar 和 TChan)。
- 实际上有必要写
newBTChanIO
而不是杠杆newBTChan
因为的实现new{TVar,TChan}IO
即使在以下情况下也能工作unsafePerformIO
, which atomically
不会。
编辑:通过将 TVar 分为一个供读者使用,一个供作者使用,从而减少争用,您实际上可以获得 2-5 倍的性能提升(取决于您使用的边界)。使用标准进行验证。改进版本 0.2.1 已经在 hackage 中。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)