我编写了一个程序来计算语料库中 NGram 的频率。我已经有一个函数,它消耗一串令牌并生成一个订单的 NGram:
ngram :: Monad m => Int -> Conduit t m [t]
trigrams = ngram 3
countFreq :: (Ord t, Monad m) => Consumer [t] m (Map [t] Int)
目前我只能将一个流消费者连接到流源:
tokens --- trigrams --- countFreq
如何将多个流消费者连接到同一个流源?
我想要这样的东西:
.--- unigrams --- countFreq
|--- bigrams --- countFreq
tokens ----|--- trigrams --- countFreq
'--- ... --- countFreq
一个优点是并行运行每个消费者
EDIT:感谢 Petr 我想出了这个解决方案
spawnMultiple orders = do
chan <- atomically newBroadcastTMChan
results <- forM orders $ \_ -> newEmptyMVar
threads <- forM (zip results orders) $
forkIO . uncurry (sink chan)
forkIO . runResourceT $ sourceFile "test.txt"
$$ javascriptTokenizer
=$ sinkTMChan chan
forM results readMVar
where
sink chan result n = do
chan' <- atomically $ dupTMChan chan
freqs <- runResourceT $ sourceTMChan chan'
$$ ngram n
=$ frequencies
putMVar result freqs
我假设您希望所有接收器接收所有值。
我建议:
- Use newBroadcastTMChan http://code.haskell.org/~wren/stm-chans/dist/doc/html/stm-chans/Control-Concurrent-STM-TMChan.html#v:newBroadcastTMChan创建一个新频道
Control.Concurrent.STM.TMChan
(stm-chans)。
- 使用此通道构建一个接收器sinkTBMChan http://hackage.haskell.org/packages/archive/stm-conduit/0.2.4.1/doc/html/Data-Conduit-TMChan.html#v:sinkTBMChan from
Data.Conduit.TMChan
(stm-conduit) 为您的主要制作人。
- 对于每个客户使用
dupTMChan
创建自己的副本以供阅读。启动一个新线程,该线程将使用以下命令读取此副本sourceTBMChan
.
- 从您的线程中收集结果。
- 确保您的客户端能够以与生成数据一样快的速度读取数据,否则可能会出现堆溢出。
(我还没有尝试过,请让我们知道它是如何工作的。)
Update:收集结果的一种方法是创建一个MVar http://hackage.haskell.org/packages/archive/base/latest/doc/html/Control-Concurrent-MVar.html#t:MVar对于每个消费者线程。他们每个人都会putMVar
完成后的结果。你的主线程会takeMVar
在所有这些MVar
s,从而等待每个线程完成。例如如果vars
是您的列表MVar
是,主线程会发出mapM takeMVar vars
收集所有结果。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)