设想 :
我们正在评估ZeroMQ
(具体来说jeroMq
)用于事件驱动机制。
应用程序是分布式的,其中多个服务(发布者和订阅者都是服务)可以存在于同一个 jvm 中或不同的节点中,这取决于部署架构。
观察
为了玩玩我创建了一个pub
/sub
图案与inproc:
作为运输,使用 jero mq(版本:0.3.5)
- 帖子发布是可以发布的(看起来是发布了,至少没有错误)
- 另一个线程中的订阅者没有收到任何内容。
Question
正在使用inproc:
随着pub
/sub
可行的?
尝试过谷歌搜索,但找不到任何具体的内容,有什么见解吗?
代码示例pub
/sub
with inproc:
使用 jero mq(版本:0.3.5)的 inproc pub sub 的工作代码示例对于后来访问这篇文章的人来说会很有用。一位出版商发布主题A
and B
,以及两个订阅者接收A
and B
分别地
/**
* @param args
*/
public static void main(String[] args) {
// The single ZMQ instance
final Context context = ZMQ.context(1);
ExecutorService executorService = Executors.newFixedThreadPool(3);
//Publisher
executorService.execute(new Runnable() {
@Override
public void run() {
startPublishing(context);
}
});
//Subscriber for topic "A"
executorService.execute(new Runnable() {
@Override
public void run() {
startFirstSubscriber(context);
}
});
// Subscriber for topic "B"
executorService.execute(new Runnable() {
@Override
public void run() {
startSecondSubscriber(context);
}
});
}
/**
* Prepare the publisher and publish
*
* @param context
*/
private static void startPublishing(Context context) {
Socket publisher = context.socket(ZMQ.PUB);
publisher.bind("inproc://test");
while (!Thread.currentThread().isInterrupted()) {
// Write two messages, each with an envelope and content
try {
publisher.sendMore("A");
publisher.send("We don't want to see this");
LockSupport.parkNanos(1000);
publisher.sendMore("B");
publisher.send("We would like to see this");
} catch (Throwable e) {
e.printStackTrace();
}
}
publisher.close();
context.term();
}
/**
* Prepare and receive through the subscriber
*
* @param context
*/
private static void startFirstSubscriber(Context context) {
Socket subscriber = context.socket(ZMQ.SUB);
subscriber.connect("inproc://test");
subscriber.subscribe("B".getBytes());
while (!Thread.currentThread().isInterrupted()) {
// Read envelope with address
String address = subscriber.recvStr();
// Read message contents
String contents = subscriber.recvStr();
System.out.println("Subscriber1 " + address + " : " + contents);
}
subscriber.close();
context.term();
}
/**
* Prepare and receive though the subscriber
*
* @param context
*/
private static void startSecondSubscriber(Context context) {
// Prepare our context and subscriber
Socket subscriber = context.socket(ZMQ.SUB);
subscriber.connect("inproc://test");
subscriber.subscribe("A".getBytes());
while (!Thread.currentThread().isInterrupted()) {
// Read envelope with address
String address = subscriber.recvStr();
// Read message contents
String contents = subscriber.recvStr();
System.out.println("Subscriber2 " + address + " : " + contents);
}
subscriber.close();
context.term();
}
The ZMQ inproc运输 http://api.zeromq.org/2-1:zmq-inproc旨在在单个进程内、不同线程之间使用。当你说“可以存在于同一个jvm中或者在不同的节点中“(强调我的)我假设你的意思是你将多个进程作为分布式服务而不是单个进程中的多个线程。
如果是这样的话,那么不,你想做的事情不会起作用inproc
. PUB-SUB/inproc
在多个线程之间的单个进程中可以正常工作。
编辑以解决评论中的进一步问题:
使用类似交通工具的原因inproc
or ipc
是因为当您在正确的上下文中使用它们时,它比 tcp 传输更高效(更快)。您可以想象使用混合的传输方式,但是您始终必须在同一传输方式上进行绑定和连接才能使其工作。
这意味着每个节点最多需要三个PUB
or SUB
插座 - 一个tcp
发布者与远程主机上的节点对话,ipc
发布者与同一主机上不同进程上的节点进行通信,以及inproc
发布者与同一进程中不同线程中的节点进行通信。
实际上,在大多数情况下,您只需使用tcp
传输并且只为所有东西旋转一个套接字 -tcp
无处不在。它could如果每个套接字负责特定的任务,那么启动多个套接字是有意义的kind的信息。
如果出于某种原因,您总是会向其他线程发送一种消息类型,并向其他主机发送不同的消息类型,那么多个套接字是有意义的,但在您的情况下,从一个节点的角度来看,所有其他节点似乎都是如此是平等的。在那种情况下我会使用tcp
无处不在并结束它。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)