管道:多个流消费者

2024-05-19

我编写了一个程序来计算语料库中 NGram 的频率。我已经有一个函数,它消耗一串令牌并生成一个订单的 NGram:

ngram :: Monad m => Int -> Conduit t m [t]
trigrams = ngram 3
countFreq :: (Ord t, Monad m) => Consumer [t] m (Map [t] Int)

目前我只能将一个流消费者连接到流源:

tokens --- trigrams --- countFreq

如何将多个流消费者连接到同一个流源? 我想要这样的东西:

           .--- unigrams --- countFreq
           |--- bigrams  --- countFreq
tokens ----|--- trigrams --- countFreq
           '--- ...      --- countFreq

一个优点是并行运行每个消费者

EDIT:感谢 Petr 我想出了这个解决方案

spawnMultiple orders = do
    chan <- atomically newBroadcastTMChan

    results <- forM orders $ \_ -> newEmptyMVar
    threads <- forM (zip results orders) $
                        forkIO . uncurry (sink chan)

    forkIO . runResourceT $ sourceFile "test.txt"
                         $$ javascriptTokenizer
                         =$ sinkTMChan chan

    forM results readMVar

    where
        sink chan result n = do
            chan' <- atomically $ dupTMChan chan
            freqs <- runResourceT $ sourceTMChan chan'
                                 $$ ngram n
                                 =$ frequencies
            putMVar result freqs

我假设您希望所有接收器接收所有值。

我建议:

  1. Use newBroadcastTMChan http://code.haskell.org/~wren/stm-chans/dist/doc/html/stm-chans/Control-Concurrent-STM-TMChan.html#v:newBroadcastTMChan创建一个新频道Control.Concurrent.STM.TMChan(stm-chans)。
  2. 使用此通道构建一个接收器sinkTBMChan http://hackage.haskell.org/packages/archive/stm-conduit/0.2.4.1/doc/html/Data-Conduit-TMChan.html#v:sinkTBMChan from Data.Conduit.TMChan(stm-conduit) 为您的主要制作人。
  3. 对于每个客户使用dupTMChan创建自己的副本以供阅读。启动一个新线程,该线程将使用以下命令读取此副本sourceTBMChan.
  4. 从您的线程中收集结果。
  5. 确保您的客户端能够以与生成数据一样快的速度读取数据,否则可能会出现堆溢出。

(我还没有尝试过,请让我们知道它是如何工作的。)


Update:收集结果的一种方法是创建一个MVar http://hackage.haskell.org/packages/archive/base/latest/doc/html/Control-Concurrent-MVar.html#t:MVar对于每个消费者线程。他们每个人都会putMVar完成后的结果。你的主线程会takeMVar在所有这些MVars,从而等待每个线程完成。例如如果vars是您的列表MVar是,主线程会发出mapM takeMVar vars收集所有结果。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

管道:多个流消费者 的相关文章

  • RankN多态性和令人发指的克莱斯利之箭

    我不明白为什么 demobind1 的定义会产生一些编译器错误 它看起来像一个愚蠢的翻转 但不知何故 LANGUAGE GADTs LANGUAGE RankNTypes ScopedTypeVariables TypeOperators
  • Haskell 项目可以使用 cmake 吗?

    我正在计划一个用 Haskell 编写的项目 也许也有一些部分是用 C 编写的 对于构建系统 我决定不选择 Haskell 程序 cabal 的常见选择 主要是因为我想了解其他语言的构建程序是如何工作的 我听说过 CMake 我认为这是一个
  • Control.Parallel.Strategies 中 Eval 的绑定运算符如何严格评估其参数?

    Control Parallel Strategies 的源代码 http hackage haskell org packages archive parallel 3 1 0 1 doc html src Control Paralle
  • 从 Penn Treebank 格式的文本中提取子句

    说我有一句话 After he had eaten the cheese Bill went to the grocery 在我的程序中 我得到以下输出 PARSE TREE ROOT S SBAR IN After S NP PRP he
  • 函数式语言中的部分求值和函数内联有什么区别?

    我知道 函数内联就是用函数定义代替函数调用 部分评估是在编译时评估程序的已知 静态 部分 在 C 等命令式语言中 两者之间存在区别 其中运算符与函数不同 但是 在像 Haskell 这样的函数式语言 其中运算符也是函数 中 两者之间有什么区
  • Haskell Fibonacci 达到最大指定数?

    我有一个已启动并正在运行的 Haskell 函数 但它做错了事情 它应该输出最多指定最大数量的斐波那契数列 像这样 fibonacciSequence 86 1 1 2 3 5 8 13 21 33 54 我的代码当前输出斐波那契数列中的前
  • 管道 - 将多个来源/生产者合并为一个

    我正在使用读取文件sourceFile 但我还需要在处理操作中引入随机性 我认为最好的方法是拥有一个这样的制片人 Producer m StdGen ByteString 其中 StdGen 用于生成随机数 我打算让生产者执行 source
  • 你将如何在 Haskell 中(重新)实现迭代?

    iterate a gt a gt a gt a 你可能知道 iterate是一个接受函数和起始值的函数 然后它将函数应用于起始值 然后将相同的函数应用于最后的结果 依此类推 Prelude gt take 5 iterate 2 2 2
  • 使用 Haskell 绘制图表

    是否可以使用 Haskell 绘制一个简单的图表 你们中的任何人都可以告诉我该怎么做吗 该图应至少包含 3 个点 Haskell 图表 https github com timbod7 haskell chart似乎不错 The wiki
  • 在列表中查找元素及其索引

    我需要让列表的两个元素都满足谓词and这些元素的索引 我可以通过以下方式实现这一点 import Data List findIndices list Int list 3 2 4 1 9 indices findIndices gt 2
  • 如何使用FeatureUnion转换PipeLine中的多个特征?

    我有一个 pandas 数据框 其中包含有关用户发送的消息的信息 对于我的模型 我感兴趣的是预测消息的缺失收件人 即给定消息的收件人 A B C 我想预测还有谁应该成为收件人的一部分 我正在使用 OneVsRestClassifier 和
  • Haskell / cabal 包的解决方法受到 Nix 和 Cabal 的限制?

    我最近开始开发反射平台 https github com reflex frp reflex platform 有一些额外的配置类似于优秀的反射项目骨架 https github com ElvishJerricco reflex proj
  • 使用 Parsec 解析正则表达式

    我正在尝试通过实现一个小型正则表达式解析器来学习秒差距 在 BNF 中 我的语法类似于 EXP EXP LIT EXP LIT 我尝试在 Haskell 中实现这一点 expr try star lt gt try litE lt gt l
  • 并行 Haskell - GHC GC 火花

    我有一个正在尝试并行化的程序 带有可运行代码的完整粘贴here http lpaste net 101528 我进行了分析 发现大部分时间都花在findNearest这本质上是一个简单的foldr超过一个大Data Map findNear
  • Haskell数据类型转换问题

    我目前正在学习 Haskell 并且一直在编写一些非常简单的程序来练习 我的程序之一是 import System IO main do putStrLn Give me year y lt getLine let res show cal
  • nltk 标记化和缩写

    我用 nltk 对文本进行标记 只是将句子输入到 wordpunct tokenizer 中 这会拆分缩写 例如 don t 到 don t 但我想将它们保留为一个单词 我正在改进我的方法 以实现更精确的文本标记化 因此我需要更深入地研究
  • Haskell if-then-else 条件中的“解析输入错误”

    当我尝试编译以下 do 块时 它会抛出错误 输入 conn 上的解析错误 我尝试了许多不同的 if then else 语句配置 但均无济于事 在我添加条件之前 数据库逻辑就起作用了 所以这没有问题 else 中是否有太多行 有没有办法在不
  • seq在haskell中代表什么

    我是 Haskell 的新手 刚刚进入惰性世界编程 我读到seq函数非常特殊 因为它强制使用严格的评估 以便在某些情况下更加有效 但我就是找不到什么seq代表字面意思 也许严格评估Q 它应该提醒您 顺序 或 顺序 因为它允许程序员指定其参数
  • 告诉阴谋集团主模块在哪里

    我有一个具有以下结构的项目 foo cabal src Foo Main hs foo cabal 的一部分如下所示 executable foo main is Foo Main hs hs source dirs src Main hs
  • 对产品列表进行分类的算法? [关闭]

    Closed 这个问题需要多问focused help closed questions 目前不接受答案 我有一个代表或多或少相同的产品的列表 例如 在下面的列表中 它们都是希捷硬盘 希捷硬盘 500Go 适用于笔记本电脑的希捷硬盘 120

随机推荐

  • 使用 swig 类型映射将向量> & 从 C++ 方法返回到 python 元组列表

    我在尝试包装一个 C 方法时遇到了很多麻烦 该方法将对向量的常量引用返回到 Python 元组列表 typemap out 我目前有这样的事情 myclass h inlcude
  • Rails 3.2:用 json 序列化中的空字符串替换 null 值

    我正在使用 Rails 3 2序列化 http www simonecarletti com blog 2010 04 inside ruby on rails serializing ruby objects with json 将 ru
  • 复杂对象MVC3的JSON序列化

    我有一个问题 我似乎不知道如何序列化类型的对象 public class SchedulingCalendarMonth public List
  • TDD/测试 CSS 和 HTML? [关闭]

    Closed 这个问题需要多问focused help closed questions 目前不接受答案 有没有办法测试 CSS 和 HTML 例如 有时某些通知会受到某些 CSS 更改的影响 我不想每次进行更改时都手动测试所有通知 Tha
  • 如何更新 Firebase 中的节点密钥?

    如何重命名14 04 2017 node 没有用于重命名节点的 API 您必须获取节点的值 使用新名称将其保存到数据库并删除旧节点
  • WCF 仅跟踪失败的请求?

    我想将跟踪信息保存到 svclog 文件中 但仅限于失败的请求 这可能吗 如果是这样 具体如何 我有一个每分钟调用数百次的 WCF 服务 在极少数情况下 客户端会收到错误 500 该错误发生在 WCF 内运行的代码边界之外 通常是安全问题
  • 为什么我的 Github 托管网站响应 HTTP 302 而不是 200?

    我拥有该域名penkov id au http penkov id au 我主持一个blog http michael penkov id au blog 2014 01 02 reinventing the wheel html usin
  • 是否有 .NET 库或 API 可以与 IIS 配置数据库交互/编辑它?

    或者我是否坚持使用自己的 XML 切割 功能 我想创建一个小型任务托盘应用程序 以便我可以快速将虚拟目录重新指向硬盘上的几个文件夹之一 一点背景 我的开发机器上的代码库有 3 个不同的 svn 分支 Current Production B
  • 在 R 的 for 循环中创建动态命名对象并分配动态值

    我正在尝试创建一套动态命名的新对象 例如 temp2015 使用 for 循环 并存储动态值 具体来说 其他对象的名称 例如 Y2015 和 for 循环中使用的值 例如 2015 在动态命名的新对象中 我不确定为什么下面的代码不起作用 Y
  • Windows Mobile - Compact Framework 程序即服务?

    让我的 C Compact Framework 程序在 Windows Mobile 设备上后台运行的最佳方式是什么 我需要响应不同的事件 例如带有特定内容的短信 我不想在进程启动时启动任何 UI 而是在后台运行 直到需要 UI 如何才能做
  • iPad - 无法在框架内滚动

    我无法滚动 iPad Safari 中框架内调用的 pdf 我已经尝试过两件事 2 指滚动 使用对象 嵌入代替框架 但这是行不通的 事实上我已经尝试了很多东西 溢出 高度等等 请帮我 先感谢您 根据这篇文章http support appl
  • 如何使用 ASP.NET Razor 语法应用 bootstrap v4 alpha 的表单输入验证类?

    因此 Bootstrap v4 alpha 对表单验证类进行了一些更改 现在 要将验证样式应用于表单输入 请将 CSS 类应用于父级div form group 我正在使用 ASP NET MVC4 编写一个网站 并试图弄清楚如何将此 CS
  • Firebase Analytics 禁用受众国家/地区跟踪

    我正在开发一个严格不允许位置跟踪的应用程序 我想使用 Firebase Analytic 的其他功能 例如 PageTransitions 和 Crashalitics 但如果我无法禁用受众位置跟踪 我就无法使用其中任何功能 这是我在 An
  • 如何将一个字节转换为位?

    我有一个字节数组 我想访问每个字节并想要其等效的二进制值 8 位 以便对其执行下一步操作 我听说过 BitSet 但是还有其他方法可以解决这个问题吗 谢谢 如果您只需要它的二进制字符串表示形式 您可以简单地使用Integer toStrin
  • Git - 如何将整个目录恢复到特定提交(删除任何添加的文件)

    我想恢复 git 中的目录 恢复其中的所有文件 并删除自该提交以来添加的所有文件 进行结账似乎只能满足我的第一个要求 但不会删除任何文件 我想出了最简单的解决方案 git rm path to dir git checkout
  • 如何从 JavaScript 中的字符串中删除空白字符?

    如何从 JavaScript 中的字符串中删除空白字符 修剪很容易 但我不知道如何将它们从inside字符串 例如 222 334 gt 222334 您可以使用正则表达式 如下所示来替换所有空格 var oldString 222 334
  • 如何将安卓手机从睡眠状态唤醒?

    如何以编程方式将 Android 手机从睡眠状态唤醒 挂起至内存 我不想获取任何唤醒锁 这意味着手机在禁用 CPU 的情况下进入 真正的 睡眠状态 我想我可以使用某种RTC 实时时钟 机制 有人有例子吗 Thanks 为了让Activity
  • 为什么将函数参数声明为最终的?

    我目前正在阅读 Sams 出版的 24 小时自学 Android 应用程序开发 一书 我对 Java Android 或其他方面还比较陌生 我对 ActionScript 3 有非常扎实的背景 它与 Java 有足够的相似之处 因此该语言本
  • Seaborn 将 xticks 从 float 更改为 int

    我正在绘制一个图表 其中seaborn为sns pylab为plt plt figure figsize 10 10 sns barplot y whatever y x whatever x data mydata plt xticks
  • 管道:多个流消费者

    我编写了一个程序来计算语料库中 NGram 的频率 我已经有一个函数 它消耗一串令牌并生成一个订单的 NGram ngram Monad m gt Int gt Conduit t m t trigrams ngram 3 countFre