如何控制ConsumerGroup处理消息的并发度

2023-12-14

我正在使用 kafka-node ConsumerGroup 来消费来自主题的消息。 ConsumerGroup在消费消息时需要调用外部API,甚至可能需要一秒钟才能响应。 我希望控制消费队列中的下一条消息,直到收到 API 的响应,以便按顺序处理消息。

我应该如何控制这种行为?


这就是我们一次处理 1 条消息的方式:

var async = require('async'); //npm install async

//intialize a local worker queue with concurrency as 1 (only 1 event is processed at a time)
var q = async.queue(function(message, cb) {
          processMessage(message).then(function(ep) {
          cb(); //this marks the completion of the processing by the worker
        });
}, 1);

// a callback function, invoked when queue is empty. 
q.drain = function() {
    consumerGroup.resume(); //resume listening new messages from the Kafka consumer group
};

//on receipt of message from kafka, push the message to local queue, which then will be processed by worker
function onMessage(message) {
  q.push(message, function (err, result) {  
    if (err) { logger.error(err); return }      
  });
  consumerGroup.pause(); //Pause kafka consumer group to not receive any more new messages
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何控制ConsumerGroup处理消息的并发度 的相关文章

随机推荐

  • 这些 Git 合并标记的简单解释是什么?

    下面参考代码段1 2 3解释Git合并标记的含义 Code from beginning of file lt lt lt lt lt lt lt HEAD code segment 1 merged common ancestors co
  • 为什么linux内核中的udelay和ndelay不准确?

    我做了一个这样的函数 trace printk 111111 udelay 4000 trace printk 222222 日志显示它是 4 01 毫秒 没问题 但当我这样打电话时 trace printk 111111 ndelay 1
  • 如何在 CORS 预检选项请求中发送自定义标头?

    我正在尝试发送 JSON 负载的 CORS 请求 我控制服务器和客户端 我在这里跟随 服务器有一个自定义标头 必须与每个请求一起发送 因此 此自定义标头使请求 不简单 因此必须使用 OPTIONS 请求对请求进行预检 我可以看到 jQuer
  • Rails 4 应用程序中的子域

    今天我遇到了一个很奇怪的现象 当开发一个每个用户都有自己的子域的 Rails 应用程序并尝试使用 Devise 来完成此操作时 我遇到了未注册的子域也会路由到根页面的情况 因此 例如 即使没有 显式 子域 它也会将我路由到主应用程序页面 也
  • Excel编程

    我想让我男朋友尝尝编程的滋味 如果由我决定 我会教Scheme Haskell 或F 但因为他更愿意学习一些对他作为财务顾问的工作有用的东西 即Excel 编程 Excel 编程有哪些选项 对于刚刚学习编程但想要完成任务的人 您会推荐哪一款
  • 将长文本换行到下拉列表中?

    我的 asp net 页面上的下拉列表中有清晰的长文本 它违反了 UI 边界并超出了 UI 的分配区域 无论如何 我可以使用 CSS 或 javascript 包裹 而不是修剪 它吗 我必须显示整个字符串 无论它有多长 更长的答案 是的 您
  • 测试数据中因子水平未知的 Predict.lm()

    我正在拟合一个模型来分解数据并进行预测 如果newdata in predict lm 包含模型未知的单个因素水平 all of predict lm 失败并返回错误 有没有好的方法可以拥有predict lm 返回模型已知的因子水平的预测
  • 为什么-2147483648在可以容纳int的情况下会自动提升为long?

    include
  • 在 Eclipse 中为从 Eclipse 启动的应用程序指定替代 JRE

    我正在尝试在 Eclipse 中为我将从 Eclipse 启动的应用程序指定一个替代 jre 我的默认值是 1 6 我需要使用 jdk 1 4 2 运行 我不确定我在以下代码中是否做了正确的事情 Path jreContainerPath
  • bash:替换“”内的变量值

    抱歉 如果问题非常简单 但我是 shell 脚本的新手 我正在尝试写这样的东西 for i in 1 20 do curl something i d something i something done 问题是第二个 i单引号内的部分 不
  • MySQL别名简写?

    我需要从两个表中选择所有列 但需要能够在结果中区分它们 是否有一种简写方法可以为结果中的每一列指定一个别名 例如 SELECT t1 AS t1 SOMETHING t2 AS SOMETHING ELSE FROM TABLE1 INNE
  • 阻止仙人掌图上的有向路径[关闭]

    Closed 这个问题需要细节或清晰度 目前不接受答案 我想找到最长的路径距离仙人掌图具有某些阻塞定向路径 For example if we have following 4 nodes 这意味着 如果我们访问 1 我们就无法访问 2 也
  • 如何在jqGrid表单中添加简单的文本标签?

    当从寻呼机使用 添加 或 编辑 表单时 我想知道如何在表单中添加一个简单的静态标签 而不创建任何对 colNames 和 colModel 产生影响的附加列 例如 我有一个非常简单的典型添加表单 它从包含一些标签和表单元素的寻呼机打开 名称
  • AutoMapper 3.1.1 和 Entity Framework 6.1 代理对象

    我意识到已经有人问过这个问题 但到目前为止我读过的解决方案似乎没有什么作用 我正在使用 Entity Framework 6 1 和 AutoMapper 3 1 1 采取以下对象 Company and CompanyListItem 我
  • 列出所有外键 PostgreSQL

    我需要一个返回的查询 表名 字段名 字段类型 约束名 到目前为止我有 select conrelid regclass AS table name regexp replace pg get constraintdef c oid 1 as
  • Gradle,命令行“cmd”、“/c”、“echo doLast!”什么也没做

    我正在读关于Gradle 执行程序并创建了以下内容build gradle task startTomcat type Exec commandLine cmd c echo init startTomcat task stopTomcat
  • 在 Javascript 中获取时区的 utc 偏移量

    我需要一个 Javascript 函数 给定时区 返回当前 UTC 偏移量 例如 theFuncIneed US Eastern gt 240 如今这已成为可能Intl API 实施Intl是基于icu4c 如果您挖掘源代码 您会发现时区名
  • 通过 spring-rabbitmq 自动重试连接到代理

    我读过这个文档片段 RabbitMQ 自动连接 拓扑恢复 自 Spring AMQP 第一个版本以来 该框架提供了其 在代理发生故障时 可以恢复自己的连接和通道 此外 如第 3 1 10 节 配置代理 中所述 RabbitAdmin 将在以
  • 注释时间序列图

    我有一个日期索引数组 x 日期时间对象 和一个实际值数组 y 债券价格 执行以下操作 plot x y 生成一个完美的时间序列图 其中 x 轴标有日期 到目前为止没有问题 但我想在某些日期添加文本 例如 在2009年10月31日 我希望显示
  • 如何控制ConsumerGroup处理消息的并发度

    我正在使用 kafka node ConsumerGroup 来消费来自主题的消息 ConsumerGroup在消费消息时需要调用外部API 甚至可能需要一秒钟才能响应 我希望控制消费队列中的下一条消息 直到收到 API 的响应 以便按顺序