如何交错流(带背压)

2023-11-23

假设我有两个可能无限的流:

s1 = a..b..c..d..e...
s2 = 1.2.3.4.5.6.7...

我想要merge流,然后使用缓慢的异步操作映射合并流(例如,在 Bacon 中)fromPromise and flatMapConcat).

我可以将它们与merge:

me = a12b3.c45d6.7e...

然后是地图

s1 = a..b..c..d..e...
s2 = 1.2.3.4.5.6.7...
me = a12b3.c45d6.7e...
mm = a..1..2..b..3..c..4..5..

如你所见greedier s2从长远来看,流具有优势。这是不受欢迎的行为.


The merge行为不好,因为我想要某种背压来进行更多交错、“公平”、“循环”合并。几个例子desired行为:

s1 = a.....b..............c...
s2 = ..1.2.3..................
mm = a...1...b...2...3....c...

s1 = a.........b..........c...
s2 = ..1.2.3..................
mm = a...1...2...b...3....c...

一种思考方式是s1 and s2将任务发送给同时只能处理一项任务的工作人员。和merge and flatMapConcat我会得到一个贪婪的任务管理器,但我想要一个更公平的。


我想找到一个简单而优雅的解决方案。如果它可以轻松推广到任意数量的流,那就太好了:

// roundRobinPromiseMap(streams: [Stream a], f: a -> Promise b): Stream b
var mm = roundRobinPromiseMap([s1, s2], slowAsyncFunc);

使用 RxJS 或其他 Rx 库的解决方案也很好。


澄清

不是 zipAsArray

我不想要:

function roundRobinPromiseMap(streams, f) {
  return Bacon.zipAsArray.apply(null, streams)
    .flatMap(Bacon.fromArray)
    .flatMapConcat(function (x) {
      return Bacon.fromPromise(f(x));
    });
}

比较示例大理石图:

s1  = a.....b..............c.......
s2  = ..1.2.3......................
mm  = a...1...b...2...3....c....... // wanted
zip = a...1...b...2........c...3... // zipAsArray based

是的,我会遇到缓冲问题

...但我也会直接unfair one:

function greedyPromiseMap(streams, f) {
  Bacon.mergeAll(streams).flatMapConcat(function (x) {
    return Bacon.fromPromise(f(x));
  });
}

大理石图

s1    = a.........b..........c...
s2    = ..1.2.3..................
mm    = a...1...2...b...3....c...
merge = a...1...2...3...b....c...

这是一段可能有帮助的疯狂代码。

它将输入流转换为单个“值”事件流,然后将它们与“发送”事件(以及用于簿记的“结束”事件)合并。然后,它使用状态机从“值”事件构建队列,并在“发送”事件上分派值。

最初我写了一个循环节流阀,但我已将其移至要点。

这是一个非常相似的 roundRobinPromiseMap。要点中的代码已经过测试,但没有经过测试。

# roundRobinPromiseMap :: (a -> Promise b) -> [EventStream] -> EventStream
roundRobinPromiseMap = (promiser, streams) ->
    # A bus to trigger new sends based on promise fulfillment
    promiseFulfilled = new Bacon.Bus()

    # Merge the input streams into a single, keyed stream
    theStream = Bacon.mergeAll(streams.map((s, idx) ->
        s.map((val) -> {
            type: 'value'
            index: idx
            value: val
        })
    ))
    # Merge in 'end' events
    .merge(Bacon.mergeAll(streams.map((s) ->
        s.mapEnd(-> {
            type: 'end'
        })
    )))
    # Merge in 'send' events that fire when the promise is fulfilled.
    .merge(promiseFulfilled.map({ type: 'send' }))
    # Feed into a state machine that keeps queues and only creates
    # output events on 'send' input events.
    .withStateMachine(
        {
            queues: streams.map(-> [])
            toPush: 0
            ended: 0
        }
        handleState

    )
    # Feed this output to the promiser
    theStream.onValue((value) ->
        Bacon.fromPromise(promiser(value)).onValue(->
            promiseFulfilled.push()
    ))

handleState = (state, baconEvent) ->
    outEvents = []

    if baconEvent.hasValue()
        # Handle a round robin event of 'value', 'send', or 'end'
        outEvents = handleRoundRobinEvent(state, baconEvent.value())
    else
        outEvents = [baconEvent]

    [state, outEvents]

handleRoundRobinEvent = (state, rrEvent) ->
    outEvents = []

    # 'value' : push onto queue
    if rrEvent.type == 'value'
        state.queues[rrEvent.index].push(rrEvent.value)
    # 'send' : send the next value by round-robin selection
    else if rrEvent.type == 'send'
        # Here's a sentinel for empty queues
        noValue = {}
        nextValue = noValue
        triedQueues = 0

        while nextValue == noValue && triedQueues < state.queues.length
            if state.queues[state.toPush].length > 0
                nextValue = state.queues[state.toPush].shift()
            state.toPush = (state.toPush + 1) % state.queues.length
            triedQueues++
        if nextValue != noValue
            outEvents.push(new Bacon.Next(nextValue))
    # 'end': Keep track of ended streams
    else if rrEvent.type == 'end'
        state.ended++

    # End the round-robin stream if all inputs have ended
    if roundRobinEnded(state)
        outEvents.push(new Bacon.End())

    outEvents

roundRobinEnded = (state) ->
    emptyQueues = allEmpty(state.queues)
    emptyQueues && state.ended == state.queues.length

allEmpty = (arrays) ->
    for a in arrays
        return false if a.length > 0
    return true
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何交错流(带背压) 的相关文章

随机推荐

  • 如何调试“安全句柄已关闭”错误

    我继承的代码不断崩溃 并出现以下错误 根本没有改变 System ObjectDisposedException Safe handle has been closed at Microsoft Win32 UnsafeNativeMeth
  • Spring bean 范围。单例和原型

    假设有两个类ClassA和ClassB 假设 ClassB 依赖于 ClassA 在配置文件中 如果我们将 ClassA 的范围定义为单例 将 ClassB 的范围定义为 Prototype 那么每次创建 ClassA 的 bean 实例时
  • ES2021 (ES12) 中的 WeakRef 和终结器是什么

    我想了解什么是WeakRef和终结器ES2021 with a 真正的简单例子 and Where使用它们 I know WeakRef是一个类 这将允许开发人员创建对对象的弱引用 而终结器或FinalizationRegistry允许您注
  • Django 使用 Allauth 进行其余身份验证

    我已经用 Allauth 实现了 django Rest auth 如果我通过 google 登录 它工作正常access token但有一种情况 某些客户端设备需要通过谷歌登录id token 如果我使用 我会收到错误id token代替
  • 改造:如何在没有内容编码的情况下解析 GZIP 响应:gzip 标头

    我正在尝试处理 GZIP 的服务器响应 响应带有标头 Content Type application x gzip 但没有标题 Content Encoding gzip 如果我使用代理添加该标头 响应就会被很好地解析 我对服务器没有任何
  • 使用 JS 更改 svg 元素的位置

    尝试制作一个在按下按钮时移动的 svg 矩形 现在我只想通过函数修改 x function modX document getElementById rectangle transform translate 295 115 var x 2
  • 进程间通信建议[关闭]

    Closed 这个问题不符合堆栈溢出指南 目前不接受答案 我正在寻找一种轻量级 快速且简单的方法来处理 Linux 计算机上某些程序之间的进程间通信 目前 我正在考虑命名管道 因为它是由操作系统本身提供的 关于性能或可用性有什么注意事项吗
  • 汇编语言 - 如何进行取模?

    x86 汇编中是否有类似模运算符或指令之类的东西 如果您的模数 除数是已知常数 并且您关心性能 请参阅this and this 对于直到运行时才知道的循环不变值 乘法逆甚至是可能的 例如看https libdivide com 但是如果没
  • 展开 折叠 展开 JTree 延迟加载的问题

    我已经使用延迟加载实现了一棵树 第一级节点是在创建树时创建的 而子节点仅在用户展开任何特定节点时创建 数据来自数据库 我们向数据库发出查询以填充子节点 实现了 TreeExpansionListener 并使用 treeExpanded 方
  • boost asio异步等待条件变量

    是否可以对 boost asio 中的条件变量执行异步等待 读取 非阻塞 如果不直接支持任何有关实现的提示 我们将不胜感激 我可以实现一个计时器 甚至每隔几毫秒就触发一次唤醒 但这种方法要差得多 我发现很难相信条件变量同步没有实现 记录 如
  • 正确地将 DateTime 从 C# 插入到 mongodb

    我尝试在 MongoDB 中插入当地时间 var time DateTime Now 03 05 2014 18 30 30 var query new QueryDocument time nowTime collection3 Inse
  • 为什么要使用 urlencode?

    我正在编写一个 Web 应用程序并学习如何对 html 链接进行 urlencode 这里的所有 urlencode 问题 参见下面的标签 都是 如何 问题 我的问题不是 如何 但为什么 即使维基百科的文章也只讨论了它的机制 http en
  • 移动 NumPy 数组中的所有索引

    我有一个像这样的 numpy 数组 x np array 0 1 2 3 4 想要创建一个数组 其中索引 0 中的值位于索引 1 中 索引 1 中的值位于索引 2 中 依此类推 我想要的输出是 y np array 0 0 1 2 3 我猜
  • 在命令行中制作java包

    虽然它可能是推荐使用的 IDE 来编码高级 java 项目 但我个人更喜欢几乎完全运行命令行 使用 gedit 作为文本编辑器 所以请不要只是告诉我 就用 eclipse 吧 或其他什么 P 我的问题是在java中通过命令创建包的方法是什么
  • 使用 group by 进行 SQL 连接的 HQL 版本

    我有两张表 Band 和 Votes Band 有一个名称和一个 id Votes 有一个 Total votes 列和一个名为 band id 的外键 该外键指向 band id 我有很多选票 在不同日期保存 我想要做的是找到每个频段的
  • 如何使用 readline 支持重新安装 ruby​​?

    我已经按照 RVM 的说明安装了 Rubyhttps github com wayneeseguin rvm installation 作为信息 我有所有档案 readline 5 2 tar gz readline 6 2 tar gz
  • 在 java eclipse 控制台中更改颜色

    有没有办法改变eclipse控制台中的文本颜色 我不是在谈论当我进入选项并将颜色从黑色更改为红色时 我的意思是 就像当我启动程序并执行代码时 它会在某个时刻改变颜色 例如 code if a 2 change text color to r
  • SimpleCov 报告使用 Spork 运行 RSpec 测试后未在 Rails 3 应用程序中生成

    我刚刚安装了简单冠状病毒gem 在我的 Rails 3 2 6 应用程序上生成代码覆盖率报告 它与 RSpec 配合得很好 但与 Spork 配合不好 我可以通过运行获得所需的正确报告rspec no drb spec 但我也想让它们与 S
  • 没有删除语句的 MySQL 复制

    我一直在寻找一种方法来防止MySQL删除语句被从站处理 我正在从事数据仓库项目 我想在将数据复制到从站后从生产服务器中删除数据 完成这件事的最佳方法是什么 谢谢 做这件事有很多种方法 Run SET SQL LOG BIN 0 在执行删除之
  • 如何交错流(带背压)

    假设我有两个可能无限的流 s1 a b c d e s2 1 2 3 4 5 6 7 我想要merge流 然后使用缓慢的异步操作映射合并流 例如 在 Bacon 中 fromPromise and flatMapConcat 我可以将它们与