这看起来像我的 kafka 节点消费者:
var kafka = require('kafka-node');
var consumer = new Consumer(client, [], {
...
});
在某些情况下,获取的消息数量超出了我的处理能力。
有没有办法限制它(例如每秒接受不超过 1000 条消息,可能使用暂停 api?)
- 我正在使用 kafka-node,与 Java 版本相比,它的 api 似乎有限
在 Kafka 中,轮询和处理应该以协调/同步的方式进行。即,每次轮询之后,您应该先处理所有收到的数据,然后再进行下一次轮询。此模式会自动将消息数量限制为客户端可以处理的最大吞吐量。
像这样的东西(伪代码):
while(isRunning) {
messages = poll(...)
for(m : messages) {
process(m);
}
}
(这就是为什么没有参数“fetch.max.messages”的原因——你只是不需要它。)
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)