对 rxjs 订阅进行反跳和缓冲

2023-12-10

我有一个消息队列处理器,可以将消息提供给服务......

q.on("message", (m) => {
  service.create(m)
    .then(() => m.ack())
    .catch(() => n.nack())
})

该服务使用 RxJS Observable 并订阅debounceTime()那些请求。

class Service {
  constructor() {
    this.subject = new Subject()
    this.subject.debounceTime(1000)
      .subscribe(({ req, resolve, reject }) =>
        someOtherService.doWork(req)
          .then(() => resolve())
          .catch(() => reject())
      )
  }

  create(req) {
    return new Promise((resolve, reject) =>
      this.subject.next({
        req,
        resolve,
        reject
      })
    )
  }
}

问题是只有去抖请求才会得到 ackd/nackd。如何确保订阅也解决/拒绝其他请求?bufferTime()让我到达那里的一部分,但它不会重置每次调用的超时持续时间next().


对于那些正在寻找 RXJS 6 解决方案的人,我创建了一个自定义运算符,其行为就像debounce() + buffer()就像之前的答案一样。

我叫它bufferDebounceTypescript 中带有类型推断的代码片段如下:

import { Observable, OperatorFunction } from 'rxjs'
import { buffer, debounceTime } from 'rxjs/operators'

type BufferDebounce = <T>(debounce: number) => OperatorFunction<T, T[]>;
const bufferDebounce: BufferDebounce = debounce => source =>
  new Observable(observer =>
    source.pipe(buffer(source.pipe(debounceTime(debounce)))).subscribe({
      next(x) {
        observer.next(x);
      },
      error(err) {
        observer.error(err);
      },
      complete() {
        observer.complete();
      },
    }),
  );

您可以在此示例中测试其行为,以检查这是否适合您https://stackblitz.com/edit/rxjs6-buffer-debounce

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

对 rxjs 订阅进行反跳和缓冲 的相关文章

  • HTTP 服务的反应式缓存

    我正在使用 RsJS 5 5 0 1 在 Angular 2 中进行缓存 它运行良好 缓存功能的核心是 const observable Observable defer gt actualFn do gt this console log
  • Jasmine 监视 RxJS 5.5 操作员

    我正在尝试使用 Jasmine 监视 RxJS 操作员 在我的测试中有不同的用例 我希望控制 Observable 返回的内容 为了说明我正在尝试做的事情 我创建了上面的示例 尽管我认为它没有多大意义 因为此可观察值总是返回相同的硬编码字符
  • 为什么我应该使用 HttpClient 而不是 fetch?

    Angular 2 介绍HttpClient它发出一个 HTTP 请求并将它们发送到一个 RxJS observable 中 我的问题是为什么我会选择使用HttpClient s API https angular io guide htt
  • Angular:仅刷新令牌一次

    我使用带有刷新令牌策略的 JWT 作为身份验证 并且我的 Angular 客户端中有一个拦截器 它将令牌作为标头发送 我在发送之前检查是否过期 并在需要时使用我的refreshToken刷新令牌 问题是当发送 2 个 或更多 请求时 两个请
  • RxJS v5 中的速率限制和计数限制事件,但也允许传递

    我有很多事件要发送到服务 但请求有速率限制 每个请求都有计数限制 每秒 1 个请求 bufferTime 1000 每个请求 100 个活动项目 bufferCount 100 问题是 我不确定如何以有意义的方式将它们组合起来 允许通过 让
  • http 请求每 x 秒一次(角度)

    我试图在 angular2 中每 x 秒刷新一次 http 调用 ionViewDidLoad let loader this LoadingController create content Please Wait loader pres
  • Http 请求的加载指示器

    我的问题的根源是在 http 请求上显示加载指示器 我想在服务级别上执行此操作 而不必为每个组件编写代码 我所做的是实现一个 http 包装器 它基本上执行以下操作 getMyHttpObservable setLoadingIndicat
  • Angular 自定义错误处理程序未从 Promise 获取错误类型

    当从承诺中抛出每个错误时 我的自定义错误处理程序都会丢失其类型 import HttpErrorResponse from angular common http import ErrorHandler Injectable Injecto
  • 非规范化 ngrx 存储 - 设置选择器?

    我目前正在 ngrx 项目中处理一个有点复杂 深层 的结构 它可以被认为是父对象的数组 具有多个级别的子对象 它在服务器端标准化 扁平化 我的商店中的功能看起来像这样 rootObjs level1 byId lvl1 1 id lvl1
  • 使用 Jest 模拟延迟() RxJS

    有没有简单的方法来嘲笑delay 例如 带有假时间的可观察对象中的 RxJS 方法 我有这个方法 register user return this checkLog user delay 500 flatMap 当我删除时delay 方法
  • subject.next 不在 ngOnInit 中触发

    有谁知道为什么这段代码 从主题初始化一个值 不起作用 是否存在错误或设计使然 我究竟做错了什么 ts import Component OnInit from angular core import Subject from rxjs Co
  • 将 React 值从子级传递给父级

    我正在努力解决一些可能非常简单的事情 我的父组件是一个搜索小部件 它需要使用在单独的抽屉组件中定义的过滤器 目前 用户可以输入搜索查询 该查询会调用 API 并且需要根据抽屉组件中的选择器来过滤结果 但是 我无法将父母和孩子联系起来以实现这
  • RxJS:Observable.combineLatest 与 Observable.forkJoin

    我想知道两者之间有什么区别Observable combineLatest and Observable forkJoin 据我所知 唯一的区别是forkJoin预计可观察量将完成 同时combineLatest返回最新值 不仅forkJo
  • Angular2 - *ngIf 和异步可观察量

    我在将 ngIf 与可观察变量一起使用时遇到问题 问题是 当我隐藏元素时 ngIf 然后再次显示 值将不会加载 因此 div someObservable async div 基本上当 showDiv 设置为true首先 加载了 someO
  • RXJS Observable - 如何从 Observable 的构造函数外部调用 next

    我正在构建一个公开 Observable 的服务 在此服务中 我接收外部函数调用 这些函数调用应该触发 Observable 上的下一个调用 以便各个消费者获得订阅事件 在观察者构造函数期间 我可以调用 next 并且一切正常 但是如何在构
  • 错误:类型 typeof Observable 上不存在属性计时器

    代码如下 import Component from angular2 core import Observable from rxjs Rx Component selector my app template Ticks every s
  • 在 Angular 2 中进行并行调用 http get 或 post 调用

    如何在 Angular 2 中进行并行调用 HTTP get 或 post 调用 我有 2 个服务电话 其中一个愈伤组织的响应必须拨打另一个电话 有人可以建议我如何通过错误处理场景进行这些并行调用吗 如果您的服务是Observable基于而
  • 如何在 Angular2 中实现间隔/轮询以与量角器一起使用?

    我有一个 angular2 应用程序 我想用量角器进行测试 在此应用程序中 我有一个带有图表的页面 该页面正在使用自动生成的数据定期更新 显然 Protractor 的一项功能是在执行测试代码之前等待脚本和 http 调用完成 但是 如果有
  • Angular 2 路由器使用 Observable 进行解析

    Angular 2 RC 5 发布后引入了路由器解析 Here https angular io docs ts latest guide router html resolve guard使用 Promise 演示了示例 如果我使用 Ob
  • Angular 中多个相同的异步管道导致多个 http 请求

    在我的角度应用程序中 我使用异步管道多次渲染组件

随机推荐

  • 在上下文菜单中定位菜单项图像(MENUITEMINFO 的 hbmpItem)

    我正在将菜单项插入到主题文本控件的 Outlook 上下文菜单中 在这里您可以找到我之前提出的有关执行此操作的问题 我遇到的问题是 菜单项的图像在 Outlook 2010 中的位置很奇怪 在 Outlook 2007 中 它的位置不同 在
  • C++ 使用 ShellExecute 打开链接

    如果我这样写 ShellExecute NULL open www google com NULL NULL SW SHOWNORMAL 一切都很好 而且都是应该的样子 但我希望用户可以输入他想去的链接 std cout lt lt Ent
  • 找到两个数组之间的最小差异[关闭]

    很难说出这里问的是什么 这个问题模棱两可 含糊不清 不完整 过于宽泛或言辞激烈 无法以目前的形式合理回答 如需帮助澄清此问题以便重新打开 访问帮助中心 给定两个排序数组 A 和 B 找到 i j 其中 A i B j 是最小值 由于数组已排
  • 添加键盘快捷键来执行 Chrome 扩展

    我创建了一个 chrome 扩展 我想使用键盘快捷键来执行它 Snippet suggested key default Ctrl Shift F 我尝试过不同的组合 例如 Ctrl Shift A Ctrl Shift D Alt X 和
  • 需要 T-SQL 查询找到所有可能的方式

    create table sample product varchar 100 Price float insert into sample values Pen 10 insert into sample values DVD 29 in
  • 滞后看不到 mutate 对前一行的影响

    我似乎偶然发现了一个mutate lag ifelse我无法解释的行为 我有以下 简化的 数据框 test lt data frame type c START END START START START START END strings
  • 跳跃列表与二叉搜索树

    我最近遇到了一种称为跳过列表 它似乎与二叉搜索树具有非常相似的行为 为什么要在二叉搜索树上使用跳跃列表 跳过列表更适合并发访问 修改 赫伯 萨特写了一篇article关于并发环境中的数据结构 它有更深入的信息 二叉搜索树最常用的实现是红黑树
  • 在 Ubuntu 中从源存储库升级 git

    我想使用升级 git源存储库在Ubuntu中 正如您所注意到的 最后一个稳定版本是 2 0 2 但我有 1 9 4 我克隆了 git 存储库 但我不知道如何继续 我想以某种方式做到这一点 我可以理解如何与存储库的分支和标签进行交互 所以我不
  • Flutter中如何使用UUID访问数据库?

    我正在使用以下函数来检索 UID FirebaseAuth auth FirebaseAuth instance getUID async final FirebaseUser user await auth currentUser fin
  • 在 SQL 中而不是在 Group By 中选择列

    我一直在尝试查找一些有关如何选择 SQL 中 Group By 语句中未包含的非聚合列的信息 但到目前为止我发现的任何内容似乎都无法回答我的问题 我有一个包含我想要的三列的表格 一个是创建日期 一个是按特定声明 ID 对记录进行分组的 ID
  • 如何将 .map() 与 Promise 结合起来? [复制]

    这个问题在这里已经有答案了 我有一个数组 对于该数组的每个元素 我需要fetch一些数据 取决于元素 并将该数据添加到数组中的元素 为了举个例子 我将模拟fetch带有 Promise 在现实生活中 这将是网络服务的答案 let p new
  • 事件监听器与事件处理程序

    有人可以解释一下在 javascript 中使某些操作调用函数的正确方法是什么吗 我应该使用事件处理程序吗onclick callFunction 或者我应该使用事件监听器 如果是 它们是如何工作的 None
  • 在 Google Document AI API 中指定文档语言

    我正在尝试使用 Google Cloud Document AI 解析手写文档 该文档包含西里尔字符 但 Document AI 有时会检测到带有拉丁字符的单词 有没有办法指定文档的语言 以便无论置信度如何 它都会尝试识别特定语言的单词 这
  • 理解 Java 中的枚举

    什么是 Java 枚举 它们是如何工作的 我可以在哪里使用它们以及如何使用它们 我可以在应用程序中不使用枚举吗 或者它们是否如此强大以至于使用它们比忽略它们更好 Java 5 中的枚举基本上是具有一组预定义实例的类 它们旨在替代整数常量的集
  • 检查用户是否被禁止或帐户实际上不存在。 Instagram,c#

    我没有使用 API 但我一直在像疯子一样研究 API 试图找到与之相关的东西 我现在正在打开该网站并下载所有内容 看看它是否包含 找不到页面 但被禁止的帐户也说了同样的事情 那么有什么已知的方法可以区分这一点吗 拨打电话以获取用户 user
  • “HttpContext.Current”属性的跨线程使用及相关事物

    我从 Essential ASP NET with Examples in C 中读到以下语句 另一个需要了解的有用属性是静态 Current 属性 HttpContext 类的 该属性始终指向当前 所服务的请求的 HttpContext
  • 在 Haskell 中使用工作池运行并行 URL 下载

    我想使用 Control Concurrent AsyncmapConcurrently执行并行下载http conduit 解决方案here不足以满足我的情况 因为我想处理n任务 但限制并发工作人员的数量m where m lt n 这还
  • Eclipse Glassfish,不发布对所需项目的更改

    我的设置如下 我正在使用 Eclipse 版本 Juno Service Release 2 构建 ID 20130225 0426 和 Glassfish 3 1 2 glassfish适配器版本为5 0 1 201201241920 我
  • pip 安装失败,并显示“ValueError:__slots__ 中的'format'与类变量冲突”

    当尝试安装 python 包时progressbar在 OSX El Capitan 上 我收到以下错误 Collecting progressbar Using cached progressbar 2 3 tar gz Complete
  • 对 rxjs 订阅进行反跳和缓冲

    我有一个消息队列处理器 可以将消息提供给服务 q on message m gt service create m then gt m ack catch gt n nack 该服务使用 RxJS Observable 并订阅debounc