这是泛美卫生组织异步客户端:
client = new MqttAsyncClient(appProps.getProperty("mqtt.broker"),
appProps.getProperty("mqtt.clientId"), new MemoryPersistence());
client.setCallback(this);
client.connect(null, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken imt) {
try {
client.subscribe(Constants.internalTopics, Constants.internalTopicQOS);
} catch (MqttException ex) {
ex.printStackTrace();
}
}
@Override
public void onFailure(IMqttToken imt, Throwable thrwbl) {
thrwbl.printStackTrace();
}
});
这里我循环发送消息:
while (iterator.hasNext()) {
try {
client.publish("user/" + userId + "/downstream", mqttMessage);
} catch(Exception ex) {
ex.printStackTrace();
}
}
Error:
Too many publishes in progress (32202)
at org.eclipse.paho.client.mqttv3.internal.ClientState.send(ClientState.java:436)
at org.eclipse.paho.client.mqttv3.internal.ClientComms.internalSend(ClientComms.java:121)
at org.eclipse.paho.client.mqttv3.internal.ClientComms.sendNoWait(ClientComms.java:139)
at org.eclipse.paho.client.mqttv3.MqttAsyncClient.publish(MqttAsyncClient.java:858)
at org.eclipse.paho.client.mqttv3.MqttAsyncClient.publish(MqttAsyncClient.java:836)
我在用Rabbitmq
看着source对于 Paho 客户端,任何给定时间的默认最大飞行消息数似乎都是 10。
因此,考虑到您的发布循环有多紧,网络层的速度只会稍微减慢,并且在任何给定时间发送的过程中最终都会有超过 10 条消息。如果您尝试以大于 0 的 QOS 发送,情况只会变得更糟。
您可以使用以下命令更改默认值setMaxInflight(int n)
方法上的MQTTConnectionsOptions
传递给的对象client.connect()
method.
我建议你尝试找到一个合适的值。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)