ZeroMQ学习笔记(5)——高级发布订阅模式

2023-05-16

第五章 高级发布订阅模式

 何时使用发布-订阅模式
 如何处理过于慢速的订阅者(自杀蜗牛模式)
 如何设计高速订阅者(黑盒模式)
 如何监控一个发布-订阅网络(特浓咖啡模式)
 如何建立一个共享的键-值存储(克隆模式)
 如何用反应器来简化复杂的服务器
 如何使用双星模式把故障转移添加到一台服务器
一、发布-订阅模式的优点和缺点
发布-订阅模式解决了旧的多播(multicast)和群消息(group messaging)的消息传递问题。发布-订阅模式是专门针对可扩展性的,适用于将大量的数据快速地发送给许多接收者。为了获得可扩展性,发布-订阅模式去掉了接收端的应答。订阅者根本连接不到发布者,它们连接到交换机上的多播组,而发布者将其消息发送给该多播组。但是,我们也无法协调发送者和接收者,这意味着:

  1. 发布者不能判断出订阅者何时成功连接,无论是初始连接还是重新连接。
  2. 订阅者不能让发布者控制它们发送消息的速度。发布者只有一个全速设置,而订阅者必须跟上这个速度,否则就会丢失消息。
  3. 发布者不能判断出订阅者何时由于程序崩溃,网络中断等原因而消失。
    然而我们需要这些进行可靠的多播。发布-订阅模式就像是一个无线电广播:加入前的一切东西,没法获得,获得的信息多少取决于你的接收质量。
    下面是发布-订阅模式的经典失败案例:
     订阅者加入晚了,错过了服务器已经发送的消息。
     订阅者获取消息的速度太慢,所以队列堆积起来后溢出。
     订阅者可能脱落而丢失当它们离开时的消息。
     订阅者可能崩溃并重新启动,而丟失它们已经收到的全部数据。
     网络可能变得超载并删除数据。
     网络可能变得太慢,所以发布者一方队列溢出而导致发布者崩溃。
    二、发布-订阅跟踪(浓咖啡模式)
    在第2章中,有一个简单的代理。zmq_proxy()方法有三个参数:桥接在一起的前端(frontend)套接字和后端(backend)套接字,还有一个把所有消息都发到其上的捕获(capture)套接字。
    示例代码:
# Espresso Pattern
# This shows how to capture data using a pub-sub proxy
#

import time

from random import randint
from string import ascii_uppercase as uppercase
from threading import Thread

import zmq
from zmq.devices import monitored_queue

from zhelpers import zpipe

# The subscriber thread requests messages starting with
# A and B, then reads and counts incoming messages.

def subscriber_thread():
    ctx = zmq.Context.instance()

    # Subscribe to "A" and "B"
    subscriber = ctx.socket(zmq.SUB)
    subscriber.connect("tcp://localhost:6001")
    subscriber.setsockopt(zmq.SUBSCRIBE, b"A")
    subscriber.setsockopt(zmq.SUBSCRIBE, b"B")

    count = 0
    while count < 5:
        try:
            msg = subscriber.recv_multipart()
        except zmq.ZMQError as e:
            if e.errno == zmq.ETERM:
                break           # Interrupted
            else:
                raise
        count += 1

    print ("Subscriber received %d messages" % count)


# publisher thread
# The publisher sends random messages starting with A-J:

def publisher_thread():
    ctx = zmq.Context.instance()

    publisher = ctx.socket(zmq.PUB)
    publisher.bind("tcp://*:6000")

    while True:
        string = "%s-%05d" % (uppercase[randint(0,10)], randint(0,100000))
        try:
            publisher.send(string.encode('utf-8'))
        except zmq.ZMQError as e:
            if e.errno == zmq.ETERM:
                break           # Interrupted
            else:
                raise
        time.sleep(0.1)         # Wait for 1/10th second


# listener thread
# The listener receives all messages flowing through the proxy, on its
# pipe. Here, the pipe is a pair of ZMQ_PAIR sockets that connects
# attached child threads via inproc. In other languages your mileage may vary:

def listener_thread (pipe):

    # Print everything that arrives on pipe
    while True:
        try:
            print (pipe.recv_multipart())
        except zmq.ZMQError as e:
            if e.errno == zmq.ETERM:
                break           # Interrupted


# main thread
# The main task starts the subscriber and publisher, and then sets
# itself up as a listening proxy. The listener runs as a child thread:

def main ():

    # Start child threads
    ctx = zmq.Context.instance()
    p_thread = Thread(target=publisher_thread)
    s_thread = Thread(target=subscriber_thread)
    p_thread.start()
    s_thread.start()

    pipe = zpipe(ctx)

    subscriber = ctx.socket(zmq.XSUB)
    subscriber.connect("tcp://localhost:6000")

    publisher = ctx.socket(zmq.XPUB)
    publisher.bind("tcp://*:6001")

    l_thread = Thread(target=listener_thread, args=(pipe[1],))
    l_thread.start()

    try:
        monitored_queue(subscriber, publisher, pipe[0], b'pub', b'sub')
    except KeyboardInterrupt:
        print ("Interrupted")

    del subscriber, publisher, pipe
    ctx.term()

if __name__ == '__main__':
    main()

浓咖啡的工作原理是创建一个监听线程读取一个PAIR套接字,并输出得到的全部内容。管道的一端是一个PAIR套接字,另一端(另一个PAIR)是我们传递给zmq_ proxy()的套接字。在实践中,过滤有趣的消息,而获得要跟踪的内容。

三、最终值缓存
最终值缓存(LVC)解决了一个新订阅者加入网络时,如何追上错过的信息问题。该理论认为,当一个新的订阅者加入,并订阅了某些特定的主题时,发布者得到通知。然后,发布者重新广播这些主题的最后一条消息。
在大型发布-订阅系统的数据量下,发布者得到有新订阅者连接的通知是不可能的。为了使真正大规模的发布-订阅网络正常工作,需要类似于PGM的协议,它利用一个高档以太网交换机的能力将数据多播到成千上万的订阅者。PGM是一个单向协议:发布者将消息发送到交换机上的一个多播地址,然后交换机为所有感兴趣的订阅者转播。发布者永远不会看到订阅者何时加入或离开。然而,在只有几十个订阅者和数量有限的主题的低容量的网络上,我们可以使用TCP, 此时的XSUB和XPUB套接字可以互相交流。
使用ZMQ制作最终值缓存(LVC),在发布者和订阅者之间建立一个代理。
我们通过制作一个突出最坏情况的发布者和订阅者开始。发布者是病态的,它通过 立即将消息发送到1000个主题开始,然后它每秒发送一个更新到随机的主题。订阅者连接并订阅一个主题。如果没有LVC,订阅者将平均等待500秒才能获得数据。
示例代码发布者:

#
# Pathological publisher
# Sends out 1,000 topics and then one random update per second
#

import sys
import time

from random import randint

import zmq

def main(url=None):
    ctx = zmq.Context.instance()
    publisher = ctx.socket(zmq.PUB)
    if url:
        publisher.bind(url)
    else:
        publisher.bind("tcp://*:5556")
    # Ensure subscriber connection has time to complete
    time.sleep(1)

    # Send out all 1,000 topic messages
    for topic_nbr in range(1000):
        publisher.send_multipart([
            b"%03d" % topic_nbr,
            b"Save Roger",
        ])

    while True:
        # Send one random update per second
        try:
            time.sleep(1)
            publisher.send_multipart([
                b"%03d" % randint(0,999),
                b"Off with his head!",
            ])
        except KeyboardInterrupt:
            print "interrupted"
            break

if __name__ == '__main__':
    main(sys.argv[1] if len(sys.argv) > 1 else None)

示例代码订阅者:

#
# Pathological subscriber
# Subscribes to one random topic and prints received messages
#

import sys
import time

from random import randint

import zmq

def main(url=None):
    ctx = zmq.Context.instance()
    subscriber = ctx.socket(zmq.SUB)
    if url is None:
        url = "tcp://localhost:5556"
    subscriber.connect(url)

    subscription = b"%03d" % randint(0,999)
    subscriber.setsockopt(zmq.SUBSCRIBE, subscription)

    while True:
        topic, data = subscriber.recv_multipart()
        assert topic == subscription
        print data

if __name__ == '__main__':
    main(sys.argv[1] if len(sys.argv) > 1 else None)

示例代码最终值缓存代理:

#
# Last value cache
# Uses XPUB subscription messages to re-send data
#

import zmq

def main():
    ctx = zmq.Context.instance()
    frontend = ctx.socket(zmq.SUB)
    frontend.connect("tcp://*:5557")
    backend = ctx.socket(zmq.XPUB)
    backend.bind("tcp://*:5558")

    # Subscribe to every single topic from publisher
    frontend.setsockopt(zmq.SUBSCRIBE, b"")

    # Store last instance of each topic in a cache
    cache = {}

    # main poll loop
    # We route topic updates from frontend to backend, and
    # we handle subscriptions by sending whatever we cached,
    # if anything:
    poller = zmq.Poller()
    poller.register(frontend, zmq.POLLIN)
    poller.register(backend, zmq.POLLIN)
    while True:

        try:
            events = dict(poller.poll(1000))
        except KeyboardInterrupt:
            print("interrupted")
            break

        # Any new topic data we cache and then forward
        if frontend in events:
            msg = frontend.recv_multipart()
            topic, current = msg
            cache[topic] = current
            backend.send_multipart(msg)

        # handle subscriptions
        # When we get a new subscription we pull data from the cache:
        if backend in events:
            event = backend.recv()
            # Event is one byte 0=unsub or 1=sub, followed by topic
            if event[0] == b'\x01':
                topic = event[1:]
                if topic in cache:
                    print ("Sending cached topic %s" % topic)
                    backend.send_multipart([ topic, cache[topic] ])

if __name__ == '__main__':
    main()

注意:默认情况下,XPUB套接字不报告重复的订阅。

四、检测慢订阅者(自杀蜗牛模式)
在现实中使用发布-订阅模式的时候,一个常见问题是慢速订阅者。理想中,数据从发布者全速发送给订阅者。现实中,订阅者应用程序通常用解释型语言编写,要么做了很多工作,要么只是编写得不好,以至于它们无法跟上发布者的速度。
以下是一些处理慢订阅者的方法:
(1)在发布者中贮存消息。这是Gmail的做法,如果过去的几小时里没有阅读邮件的话,它会把邮件保存起来。但在高吞吐量的应用中,发布者堆积消息往往会导致内存溢出,最终崩溃。特别是当同是有多个订阅者时,或者无法用磁盘来做一个缓冲,情况就会变得更为复杂。
(2)在订阅者中贮存消息。这种做法要好的多,其实ZMQ默认的行为就是这样的。如果非得有一个人会因为内存溢出而崩溃,那也只会是订阅者,而不是发布者,这挺公平的。然而,这种做法只对瞬间消息量很大的应用才合理,订阅者只是一时处理不过来,但最终会赶上进度。但是,这还是没有解决订阅者速度过慢的问题。
(3)暂停发送消息。这也是Gmail的做法,当我的邮箱容量超过7.554GB时,新的邮件就会被Gmail拒收或丢弃。这种做法对发布者来说很有益,ZMQ中若设置了阈值(HWM),其默认行为也就是这样的。但是,我们仍不能解决慢订阅者的问题,我们只是让消息变得断断续续而已。
(4)断开与慢订阅者的连接。这是hotmail的做法,如果连续两周没有登录,它就会断开,这也是为什么我正在使用第十五个hotmail邮箱。不过这种方案在ZMQ里是行不通的,因为对于发布者而言,订阅者是不可见的,无法做相应处理。
没有一种经典的方式可以满足我们的需求,所以我们就要进行创新了。我们说服订阅者来杀死自己, 而不是断开发布者的连接。这就是自杀蜗牛模式。当订阅者检测到它自己的运行速度过慢,它会哀嚎一声,然后自杀。
订阅者如何检测这一点呢?一种方法是将消息依次排列(按顺序给它们编号),并在发 布者上使用HWM(这里的阈值就是订阅者自杀的值)。这种方案有两个问题:一、如果我们连接的多个发布者,我们要如何为消息进行编号呢?解决方法是为每一个发布者设定一个唯一的编号,作为消息编号的一部分。二、如果订阅者使用ZMQ_SUBSRIBE选项对消息进行了过滤,那么我们精心设计的消息编号机制就毫无用处了。
有些情形不会进行消息的过滤,所以消息编号还是行得通的。不过更为普遍的解决方案是,发布者为消息标注时间戳,当订阅者收到消息时会检测这个时间戳,如果其差别达到某一个值,就发出警报并自杀。当订阅者有自身的客户端或服务协议,需要保证最大延迟时间时,自杀的蜗牛模式会很合适。
示例代码:

"""
Suicidal Snail

Author: Min RK <benjaminrk@gmail.com>
"""
from __future__ import print_function
import sys
import threading
import time
from pickle import dumps, loads
import random

import zmq

from zhelpers import zpipe

# ---------------------------------------------------------------------
# This is our subscriber
# It connects to the publisher and subscribes to everything. It
# sleeps for a short time between messages to simulate doing too
# much work. If a message is more than 1 second late, it croaks.

MAX_ALLOWED_DELAY = 1.0    # secs

def subscriber(pipe):
    # Subscribe to everything
    ctx = zmq.Context.instance()
    sub = ctx.socket(zmq.SUB)
    sub.setsockopt(zmq.SUBSCRIBE, b'')
    sub.connect("tcp://localhost:5556")

    # Get and process messages
    while True:
        clock = loads(sub.recv())
        # Suicide snail logic
        if (time.time() - clock > MAX_ALLOWED_DELAY):
            print("E: subscriber cannot keep up, aborting", file=sys.stderr)
            break

        # Work for 1 msec plus some random additional time
        time.sleep(1e-3 * (1+2*random.random()))
    pipe.send(b"gone and died")


# ---------------------------------------------------------------------
# This is our server task
# It publishes a time-stamped message to its pub socket every 1ms.

def publisher(pipe):
    # Prepare publisher
    ctx = zmq.Context.instance()
    pub = ctx.socket(zmq.PUB)
    pub.bind("tcp://*:5556")

    while True:
        # Send current clock (secs) to subscribers
        pub.send(dumps(time.time()))
        try:
            signal = pipe.recv(zmq.DONTWAIT)
        except zmq.ZMQError as e:
            if e.errno == zmq.EAGAIN:
                # nothing to recv
                pass
            else:
                raise
        else:
            # received break message
            break
        time.sleep(1e-3)            # 1msec wait


# This main thread simply starts a client, and a server, and then
# waits for the client to signal it's died.

def main():
    ctx = zmq.Context.instance()
    pub_pipe, pub_peer = zpipe(ctx)
    sub_pipe, sub_peer = zpipe(ctx)

    pub_thread = threading.Thread(target=publisher, args=(pub_peer,))
    pub_thread.daemon=True
    pub_thread.start()
    sub_thread = threading.Thread(target=subscriber, args=(sub_peer,))
    sub_thread.daemon=True
    sub_thread.start()
    # wait for sub to finish
    sub_pipe.recv()
    # tell pub to halt
    pub_pipe.send(b"break")
    time.sleep(0.1)

if __name__ == '__main__':
    main()

示例说明:
示例程序中的消息包含了系统当前的时间戳(毫秒)。在现实应用中,应该使用时间戳作为消息头,并提供消息内容。
示例程序中的发布者和订阅者是同一个进程的两个线程。在现实应用中,他们应该是两个不同的进程。示例中这么做只是为了演示的方便
五、高速订阅者(黑箱模式)
发布-订阅模式的一个典型应用场景是大规模分布式数据处理。如要处理从证券市场上收集到的数据,可以在证券交易系统上设置一个发布者,获取价格信息,并发送给一组订阅者。如果我们有少数订阅者,我们可以使用TCP。如果订阅者到达一定的数量,那我们就应该使用可靠的广播协议。
在我们开始发布消息时,有两点需要注意:

  1. 即便只是处理很少的数据,订阅者仍有可能跟不上发布者的速度。
  2. 当处理到6M/s的数据量时,发布者和订阅者都有可能达到极限。
    首先,我们需要将订阅者设计为一种多线程的处理程序,这样我们就能在一个线程中读取消息,使用其他线程来处理消息。一般来说,我们对每种消息的处理方式都是不同的。这样一来,订阅者可以对收到的消息进行一次过滤,如根据头信息来判别。当消息满足某些条件,订阅者会将消息交给worker处理。用ZMQ的语言来说,订阅者会将消息转发给worker来处理。
    这样一来,订阅者看上去就像是一个队列装置,我们可以用各种方式去连接队列装置和worker。如我们建立单向的通信,每个worker都是相同的,可以使用PUSH和PULL套接字,分发的工作就交给ZMQ吧。这是最简单也是最快速的方式。
    在这里插入图片描述

订阅者和发布者之间的通信使用TCP或PGM协议,订阅者和worker的通信由于是在同一个进程中完成的,所以使用inproc协议。

下面我们看看如何突破瓶颈。由于订阅者是单线程的,当它的CPU占用率达到100%时,它无法使用其他的核心。单线程程序总是会遇到瓶颈的,不管是2M、6M还是更多。我们需要将工作量分配到不同的线程中去,并发地执行。
很多高性能产品使用的方案是分片,就是将工作量拆分成独立并行的流。如,一半的专题数据由一个流媒体传输,另一半由另一个流媒体传输。我们可以建立更多的流媒体,但如果CPU核心数不变,那就没有必要了。 让我们看看如何将工作量分片为两个流:
在这里插入图片描述

要让两个流全速工作,需要这样配置ZMQ:
 使用两个I/O线程,而不是一个;
 使用两个独立的网络接口;
 每个I/O线程绑定至一个网络接口;
 两个订阅者线程,分别绑定至一个核心;
 使用两个SUB套接字;
 剩余的核心供worker使用;
 worker线程同时绑定至两个订阅者线程的PUSH套接字。
创建的线程数量应和CPU核心数一致,如果我们建立的线程数量超过核心数,那其处理速度只会减少。另外,开放多个I/O线程也是没有必要的。
六、可靠的发布-订阅(克隆模式)
作为一个能工作的更大的示例,我们将遇到制作一个可靠的发布-订阅架构的问题。我们的目标是允许一组应用程序共享一些共同的状态。以下是我们面临的技术挑战:
• 有一大堆客户端应用程序,比如说,几千或几万。
• 这些应用程序将随意加入和离开网络。
• 这些应用程序必须共享一个最终一致的状态。
• 任何应用程序都可以在任何时间点更新状态。
1、集中式与分散式
• 从概念上讲,中央服务器容易理解,因为网络不是自然对称的。使用中央服务器,避免了发现、绑定与连接等的所有问题。
• —般情况下,一个完全分布式的架构在技术上更具挑战性,但最后以更简单的协议告终。也就是说,每个节点都必须以正确的方式充当服务器和客户端,这是很难的。如果分布式的架构做得正确,结果会比使用中央服务器简单。
• 中央服务器将成为高容量用例的瓶颈。如果每秒顺序处理数百万规模的消息是必需的,我们应该马上把目标对准分布式架构。
• 集中式架构比分布式架构更容易扩展到多个节点。例如,连接10000个节点到一台服务器,比将这些节点相互连接更容易实现。
2、将状态表示为键值对
首先,让我们来看看如何更新一组客户端的共享状态。我们需要决定如何表示状态,以及更新。最简单可行的格式是一个键-值存储,其中一个键-值对代表在共享状态中变化的原子单元。
修改天气服务器和客户端,使之发送键-值对,而客户端将这些键-值对存储在散列表中。这使得我们使用经典的发布-订阅模型从一台服务器发送更新到一组客户端。
所有的繁重工作都是在一个kvmsg类中完成的。
在这里插入图片描述

示例代码克隆服务器,模型一

"""
Clone server Model One

"""

import random
import time
import zmq
from kvsimple import KVMsg

def main():
    # Prepare our context and publisher socket
    ctx = zmq.Context()
    publisher = ctx.socket(zmq.PUB)

    publisher.bind("tcp://*:5556")
    time.sleep(0.2)

    sequence = 0
    random.seed(time.time())
    kvmap = {}

    try:
        while True:
            # Distribute as key-value message
            sequence += 1
            kvmsg = KVMsg(sequence)
            kvmsg.key = "%d" % random.randint(1,10000)
            kvmsg.body = "%d" % random.randint(1,1000000)
            kvmsg.send(publisher)
            kvmsg.store(kvmap)
    except KeyboardInterrupt:
        print " Interrupted\n%d messages out" % sequence

if __name__ == '__main__':
    main()

示例代码克隆客户端,模型一

"""
Clone Client Model One

Author: Min RK <benjaminrk@gmail.com>

"""

import random
import time

import zmq

from kvsimple import KVMsg

def main():
    # Prepare our context and publisher socket
    ctx = zmq.Context()
    updates = ctx.socket(zmq.SUB)
    updates.linger = 0
    updates.setsockopt(zmq.SUBSCRIBE, '')
    updates.connect("tcp://localhost:5556")

    kvmap = {}
    sequence = 0

    while True:
        try:
            kvmsg = KVMsg.recv(updates)
        except:
            break # Interrupted
        kvmsg.store(kvmap)
        sequence += 1
    print "Interrupted\n%d messages in" % sequence


if __name__ == '__main__':
    main()

示例代码kvmsg类

"""
=====================================================================
kvsimple - simple key-value message class for example applications

Author: Min RK <benjaminrk@gmail.com>

"""

import struct # for packing integers
import sys
import zmq

class KVMsg(object):
    """
    Message is formatted on wire as 3 frames:
    frame 0: key (0MQ string)
    frame 1: sequence (8 bytes, network order)
    frame 2: body (blob)
    """
    key = None # key (string)
    sequence = 0 # int
    body = None # blob

    def __init__(self, sequence, key=None, body=None):
        assert isinstance(sequence, int)
        self.sequence = sequence
        self.key = key
        self.body = body

    def store(self, dikt):
        """Store me in a dict if I have anything to store"""
        # this seems weird to check, but it's what the C example does
        if self.key is not None and self.body is not None:
            dikt[self.key] = self

    def send(self, socket):
        """Send key-value message to socket; any empty frames are sent as such."""
        key = '' if self.key is None else self.key
        seq_s = struct.pack('!l', self.sequence)
        body = '' if self.body is None else self.body
        socket.send_multipart([ key, seq_s, body ])

    @classmethod
    def recv(cls, socket):
        """Reads key-value message from socket, returns new kvmsg instance."""
        key, seq_s, body = socket.recv_multipart()
        key = key if key else None
        seq = struct.unpack('!l',seq_s)[0]
        body = body if body else None
        return cls(seq, key=key, body=body)

    def dump(self):
        if self.body is None:
            size = 0
            data='NULL'
        else:
            size = len(self.body)
            data=repr(self.body)
        print >> sys.stderr, "[seq:{seq}][key:{key}][size:{size}] {data}".format(
            seq=self.sequence,
            key=self.key,
            size=size,
            data=data,
        )

# ---------------------------------------------------------------------
# Runs self test of class

def test_kvmsg (verbose):
    print " * kvmsg: ",

    # Prepare our context and sockets
    ctx = zmq.Context()
    output = ctx.socket(zmq.DEALER)
    output.bind("ipc://kvmsg_selftest.ipc")
    input = ctx.socket(zmq.DEALER)
    input.connect("ipc://kvmsg_selftest.ipc")

    kvmap = {}
    # Test send and receive of simple message
    kvmsg = KVMsg(1)
    kvmsg.key = "key"
    kvmsg.body = "body"
    if verbose:
        kvmsg.dump()
    kvmsg.send(output)
    kvmsg.store(kvmap)

    kvmsg2 = KVMsg.recv(input)
    if verbose:
        kvmsg2.dump()
    assert kvmsg2.key == "key"
    kvmsg2.store(kvmap)

    assert len(kvmap) == 1 # shouldn't be different

    print "OK"

if __name__ == '__main__':
    test_kvmsg('-v' in sys.argv)

3、创建快照
如何处理后期加入的客户端或崩溃后重新启动的客户端。我们将在客户端做同步,如下所示:
• 客户端开始订阅服务器的更新事件,然后请求一份快照。这样就能保证这份快照是在上一次更新事件之后产生的。
• 客户端开始等待服务器的快照,并将更新事件保存在队列中,做法很简单,不要从套接字中读取消息就可以了,ZMQ会自动将这些消息保存起来,这时不应设置阈值(HWM)。
• 当客户端获取到快照后,它将再次开始读取更新事件,但是需要丢弃那些早于快照生成时间的事件。如快照生成时包含了200次更新,那客户端会从第201次更新开始读取。
• 随后,客户端就会用更新事件去更新自身的状态了。
示例代码克隆服务器,模型二

"""
Clone server Model Two

Author: Min RK <benjaminrk@gmail.com>
"""
import random
import threading
import time

import zmq

from kvsimple import KVMsg
from zhelpers import zpipe

def main():
    # Prepare our context and publisher socket
    ctx = zmq.Context()
    publisher = ctx.socket(zmq.PUB)
    publisher.bind("tcp://*:5557")

    updates, peer = zpipe(ctx)

    manager_thread = threading.Thread(target=state_manager, args=(ctx,peer))
    manager_thread.daemon=True
    manager_thread.start()


    sequence = 0
    random.seed(time.time())

    try:
        while True:
            # Distribute as key-value message
            sequence += 1
            kvmsg = KVMsg(sequence)
            kvmsg.key = "%d" % random.randint(1,10000)
            kvmsg.body = "%d" % random.randint(1,1000000)
            kvmsg.send(publisher)
            kvmsg.send(updates)
    except KeyboardInterrupt:
        print " Interrupted\n%d messages out" % sequence

# simple struct for routing information for a key-value snapshot
class Route:
    def __init__(self, socket, identity):
        self.socket = socket # ROUTER socket to send to
        self.identity = identity # Identity of peer who requested state

def send_single(key, kvmsg, route):
    """Send one state snapshot key-value pair to a socket

    Hash item data is our kvmsg object, ready to send
    """
    # Send identity of recipient first
    route.socket.send(route.identity, zmq.SNDMORE)
    kvmsg.send(route.socket)


def state_manager(ctx, pipe):
    """This thread maintains the state and handles requests from clients for snapshots.
    """
    kvmap = {}
    pipe.send("READY")
    snapshot = ctx.socket(zmq.ROUTER)
    snapshot.bind("tcp://*:5556")

    poller = zmq.Poller()
    poller.register(pipe, zmq.POLLIN)
    poller.register(snapshot, zmq.POLLIN)

    sequence = 0       # Current snapshot version number
    while True:
        try:
            items = dict(poller.poll())
        except (zmq.ZMQError, KeyboardInterrupt):
            break # interrupt/context shutdown

        # Apply state update from main thread
        if pipe in items:
            kvmsg = KVMsg.recv(pipe)
            sequence = kvmsg.sequence
            kvmsg.store(kvmap)
        # Execute state snapshot request
        if snapshot in items:
            msg = snapshot.recv_multipart()
            identity = msg[0]
            request = msg[1]
            if request == "ICANHAZ?":
                pass
            else:
                print "E: bad request, aborting\n",
                break

            # Send state snapshot to client
            route = Route(snapshot, identity)

            # For each entry in kvmap, send kvmsg to client
            for k,v in kvmap.items():
                send_single(k,v,route)

            # Now send END message with sequence number
            print "Sending state shapshot=%d\n" % sequence,
            snapshot.send(identity, zmq.SNDMORE)
            kvmsg = KVMsg(sequence)
            kvmsg.key = "KTHXBAI"
            kvmsg.body = ""
            kvmsg.send(snapshot)

if __name__ == '__main__':
    main()

示例代码克隆客户端,模型二

"""
Clone client Model Two

Author: Min RK <benjaminrk@gmail.com>

"""

import time

import zmq

from kvsimple import KVMsg

def main():

    # Prepare our context and subscriber
    ctx = zmq.Context()
    snapshot = ctx.socket(zmq.DEALER)
    snapshot.linger = 0
    snapshot.connect("tcp://localhost:5556")
    subscriber = ctx.socket(zmq.SUB)
    subscriber.linger = 0
    subscriber.setsockopt(zmq.SUBSCRIBE, '')
    subscriber.connect("tcp://localhost:5557")

    kvmap = {}

    # Get state snapshot
    sequence = 0
    snapshot.send("ICANHAZ?")
    while True:
        try:
            kvmsg = KVMsg.recv(snapshot)
        except:
            break;          # Interrupted

        if kvmsg.key == "KTHXBAI":
            sequence = kvmsg.sequence
            print "Received snapshot=%d" % sequence
            break          # Done
        kvmsg.store(kvmap)

    # Now apply pending updates, discard out-of-sequence messages
    while True:
        try:
            kvmsg = KVMsg.recv(subscriber)
        except:
            break          # Interrupted
        if kvmsg.sequence > sequence:
            sequence = kvmsg.sequence
            kvmsg.store(kvmap)

if __name__ == '__main__':
    main()

在这里插入图片描述

4、重新发布来自客户端的更新
第二个模型中,键值更新事件都来自于服务器,构成了一个中心化的模型。但是我们需要的是一个能够在客户端进行更新的缓存,并能同步到其他客户端中。这时,服务端只是一个无状态的中间件,带来的好处有:
 我们不用太过关心服务端的可靠性,因为即使它崩溃了,我们仍能从客户端中获取完整的数据。
 我们可以使用键值缓存在动态节点之间分享数据。
客户端的键值更新事件会通过PUSH-PULL套接字传达给服务端:

示例代码克隆服务器,模型三

"""
Clone server Model Three

Author: Min RK <benjaminrk@gmail.com
"""

import zmq

from kvsimple import KVMsg

# simple struct for routing information for a key-value snapshot
class Route:
    def __init__(self, socket, identity):
        self.socket = socket # ROUTER socket to send to
        self.identity = identity # Identity of peer who requested state

def send_single(key, kvmsg, route):
    """Send one state snapshot key-value pair to a socket"""
    # Send identity of recipient first
    route.socket.send(route.identity, zmq.SNDMORE)
    kvmsg.send(route.socket)

def main():
    # context and sockets
    ctx = zmq.Context()
    snapshot = ctx.socket(zmq.ROUTER)
    snapshot.bind("tcp://*:5556")
    publisher = ctx.socket(zmq.PUB)
    publisher.bind("tcp://*:5557")
    collector = ctx.socket(zmq.PULL)
    collector.bind("tcp://*:5558")

    sequence = 0
    kvmap = {}

    poller = zmq.Poller()
    poller.register(collector, zmq.POLLIN)
    poller.register(snapshot, zmq.POLLIN)
    while True:
        try:
            items = dict(poller.poll(1000))
        except:
            break           # Interrupted

        # Apply state update sent from client
        if collector in items:
            kvmsg = KVMsg.recv(collector)
            sequence += 1
            kvmsg.sequence = sequence
            kvmsg.send(publisher)
            kvmsg.store(kvmap)
            print "I: publishing update %5d" % sequence

        # Execute state snapshot request
        if snapshot in items:
            msg = snapshot.recv_multipart()
            identity = msg[0]
            request = msg[1]
            if request == "ICANHAZ?":
                pass
            else:
                print "E: bad request, aborting\n",
                break

            # Send state snapshot to client
            route = Route(snapshot, identity)

            # For each entry in kvmap, send kvmsg to client
            for k,v in kvmap.items():
                send_single(k,v,route)

            # Now send END message with sequence number
            print "Sending state shapshot=%d\n" % sequence,
            snapshot.send(identity, zmq.SNDMORE)
            kvmsg = KVMsg(sequence)
            kvmsg.key = "KTHXBAI"
            kvmsg.body = ""
            kvmsg.send(snapshot)

    print " Interrupted\n%d messages handled" % sequence


if __name__ == '__main__':
    main()

在这里插入图片描述

示例代码克隆客户端,模型三

"""
Clone client Model Three

Author: Min RK <benjaminrk@gmail.com
"""

import random
import time

import zmq

from kvsimple import KVMsg

def main():

    # Prepare our context and subscriber
    ctx = zmq.Context()
    snapshot = ctx.socket(zmq.DEALER)
    snapshot.linger = 0
    snapshot.connect("tcp://localhost:5556")
    subscriber = ctx.socket(zmq.SUB)
    subscriber.linger = 0
    subscriber.setsockopt(zmq.SUBSCRIBE, '')
    subscriber.connect("tcp://localhost:5557")
    publisher = ctx.socket(zmq.PUSH)
    publisher.linger = 0
    publisher.connect("tcp://localhost:5558")

    random.seed(time.time())
    kvmap = {}

    # Get state snapshot
    sequence = 0
    snapshot.send("ICANHAZ?")
    while True:
        try:
            kvmsg = KVMsg.recv(snapshot)
        except:
            return          # Interrupted

        if kvmsg.key == "KTHXBAI":
            sequence = kvmsg.sequence
            print "I: Received snapshot=%d" % sequence
            break          # Done
        kvmsg.store(kvmap)

    poller = zmq.Poller()
    poller.register(subscriber, zmq.POLLIN)

    alarm = time.time()+1.
    while True:
        tickless = 1000*max(0, alarm - time.time())
        try:
            items = dict(poller.poll(tickless))
        except:
            break           # Interrupted

        if subscriber in items:
            kvmsg = KVMsg.recv(subscriber)

            # Discard out-of-sequence kvmsgs, incl. heartbeats
            if kvmsg.sequence > sequence:
                sequence = kvmsg.sequence
                kvmsg.store(kvmap)
                print "I: received update=%d" % sequence

        # If we timed-out, generate a random kvmsg
        if time.time() >= alarm:
            kvmsg = KVMsg(0)
            kvmsg.key = "%d" % random.randint(1,10000)
            kvmsg.body = "%d" % random.randint(1,1000000)
            kvmsg.send(publisher)
            kvmsg.store(kvmap)
            alarm = time.time() + 1.

    print " Interrupted\n%d messages in" % sequence

if __name__ == '__main__':
    main()

几点说明:
服务端整合为一个线程,负责收集来自客户端的更新事件并转发给其他客户端。它使用PULL套接字获取更新事件,ROUTER套接字处理快照请求,以及PUB套接字发布更新事件。
客户端会每隔1秒左右发送随机的更新事件给服务端,现实中这一动作由应用程序触发。
5、处理子树
现实中的键值缓存会越变越多,而客户端可能只会需要部分缓存。我们可以使用子树的方式来实现:客户端在发送快照请求时告诉服务端它需要的子树,在订阅更新事件时也指明子树。
关于子树的语法有两种,一种是“分层路径”结构,另一种是“主题树”:
 路径层次结构:/ some / list / of / paths
 主题树:一些主题列表
示例代码克隆服务器,模型四

"""
Clone server Model Four

Author: Min RK <benjaminrk@gmail.com
"""

import zmq

from kvsimple import KVMsg

# simple struct for routing information for a key-value snapshot
class Route:
    def __init__(self, socket, identity, subtree):
        self.socket = socket        # ROUTER socket to send to
        self.identity = identity    # Identity of peer who requested state
        self.subtree = subtree      # Client subtree specification


def send_single(key, kvmsg, route):
    """Send one state snapshot key-value pair to a socket"""
    # check front of key against subscription subtree:
    if kvmsg.key.startswith(route.subtree):
        # Send identity of recipient first
        route.socket.send(route.identity, zmq.SNDMORE)
        kvmsg.send(route.socket)

def main():
    # context and sockets
    ctx = zmq.Context()
    snapshot = ctx.socket(zmq.ROUTER)
    snapshot.bind("tcp://*:5556")
    publisher = ctx.socket(zmq.PUB)
    publisher.bind("tcp://*:5557")
    collector = ctx.socket(zmq.PULL)
    collector.bind("tcp://*:5558")

    sequence = 0
    kvmap = {}

    poller = zmq.Poller()
    poller.register(collector, zmq.POLLIN)
    poller.register(snapshot, zmq.POLLIN)
    while True:
        try:
            items = dict(poller.poll(1000))
        except:
            break           # Interrupted

        # Apply state update sent from client
        if collector in items:
            kvmsg = KVMsg.recv(collector)
            sequence += 1
            kvmsg.sequence = sequence
            kvmsg.send(publisher)
            kvmsg.store(kvmap)
            print "I: publishing update %5d" % sequence

        # Execute state snapshot request
        if snapshot in items:
            msg = snapshot.recv_multipart()
            identity, request, subtree = msg
            if request == "ICANHAZ?":
                pass
            else:
                print "E: bad request, aborting\n",
                break

            # Send state snapshot to client
            route = Route(snapshot, identity, subtree)

            # For each entry in kvmap, send kvmsg to client
            for k,v in kvmap.items():
                send_single(k,v,route)

            # Now send END message with sequence number
            print "Sending state shapshot=%d\n" % sequence,
            snapshot.send(identity, zmq.SNDMORE)
            kvmsg = KVMsg(sequence)
            kvmsg.key = "KTHXBAI"
            kvmsg.body = subtree
            kvmsg.send(snapshot)

    print " Interrupted\n%d messages handled" % sequence


if __name__ == '__main__':
    main()

示例代码克隆客户端,模型四

"""
Clone client Model Four

Author: Min RK <benjaminrk@gmail.com
"""

import random
import time

import zmq

from kvsimple import KVMsg

SUBTREE = "/client/"

def main():

    # Prepare our context and subscriber
    ctx = zmq.Context()
    snapshot = ctx.socket(zmq.DEALER)
    snapshot.linger = 0
    snapshot.connect("tcp://localhost:5556")
    subscriber = ctx.socket(zmq.SUB)
    subscriber.linger = 0
    subscriber.setsockopt(zmq.SUBSCRIBE, SUBTREE)
    subscriber.connect("tcp://localhost:5557")
    publisher = ctx.socket(zmq.PUSH)
    publisher.linger = 0
    publisher.connect("tcp://localhost:5558")

    random.seed(time.time())
    kvmap = {}

    # Get state snapshot
    sequence = 0
    snapshot.send_multipart(["ICANHAZ?", SUBTREE])
    while True:
        try:
            kvmsg = KVMsg.recv(snapshot)
        except:
            raise
            return          # Interrupted

        if kvmsg.key == "KTHXBAI":
            sequence = kvmsg.sequence
            print "I: Received snapshot=%d" % sequence
            break          # Done
        kvmsg.store(kvmap)

    poller = zmq.Poller()
    poller.register(subscriber, zmq.POLLIN)

    alarm = time.time()+1.
    while True:
        tickless = 1000*max(0, alarm - time.time())
        try:
            items = dict(poller.poll(tickless))
        except:
            break           # Interrupted

        if subscriber in items:
            kvmsg = KVMsg.recv(subscriber)

            # Discard out-of-sequence kvmsgs, incl. heartbeats
            if kvmsg.sequence > sequence:
                sequence = kvmsg.sequence
                kvmsg.store(kvmap)
                print "I: received update=%d" % sequence

        # If we timed-out, generate a random kvmsg
        if time.time() >= alarm:
            kvmsg = KVMsg(0)
            kvmsg.key = SUBTREE + "%d" % random.randint(1,10000)
            kvmsg.body = "%d" % random.randint(1,1000000)
            kvmsg.send(publisher)
            kvmsg.store(kvmap)
            alarm = time.time() + 1.

    print " Interrupted\n%d messages in" % sequence

if __name__ == '__main__':
    main()

6、临时值 瞬间值指的是那些会立刻过期的值。如果你用克隆模式搭建一个类似DNS的服务时,就可以用瞬间值来模拟动态DNS解析了。当节点连接网络,对外发布它的地址,并不断地更新地址。如果节点断开连接,则它的地址也会失效。 瞬间值可以和会话(session)联系起来,当会话结束时,瞬间值也就失效了。克隆模式中,会话是由客户端定义的,并会在客户端断开连接时消亡。 更简单的方法是为每一个瞬间值设定一个过期时间,客户端会不断延长这个时间,当断开连接时这个时间将得不到更新,服务器就会自动将其删除。 我们会用这种简单的方法来实现瞬间值,因为太过复杂的方法可能不值当,它们的差别仅在性能上体现。如果客户端有很多瞬间值,那为每个值设定过期时间是恰当的;如果瞬间值到达一定的量,那最好还是将其和会话相关联,统一进行过期处理。 首先,我们需要设法在键值对消息中加入过期时间。我们可以增加一个消息帧,但这样一来每当我们需要增加消息内容时就需要修改kvmsg类库了,这并不合适。所以,我们一次性增加一个“属性”消息帧,用于添加不同的消息属性。 其次,我们需要设法删除这条数据。目前为止服务端和客户端会盲目地增改哈希表中的数据,我们可以这样定义:当消息的值是空的,则表示删除这个键的数据。 示例代码kvmsg类完整的版本

"""
=====================================================================
kvmsg - key-value message class for example applications

Author: Min RK <benjaminrk@gmail.com>

"""

import struct # for packing integers
import sys
from uuid import uuid4

import zmq
# zmq.jsonapi ensures bytes, instead of unicode:

def encode_properties(properties_dict):
    prop_s = b""
    for key, value in properties_dict.items():
        prop_s += b"%s=%s\n" % (key, value)
    return prop_s


def decode_properties(prop_s):
    prop = {}
    line_array = prop_s.split(b"\n")

    for line in line_array:
        try:
            key, value = line.split(b"=")
            prop[key] = value
        except ValueError as e:
            #Catch empty line
            pass

    return prop

class KVMsg(object):
    """
    Message is formatted on wire as 5 frames:
    frame 0: key (0MQ string)
    frame 1: sequence (8 bytes, network order)
    frame 2: uuid (blob, 16 bytes)
    frame 3: properties (0MQ string)
    frame 4: body (blob)
    """
    key = None
    sequence = 0
    uuid=None
    properties = None
    body = None

    def __init__(self, sequence, uuid=None, key=None, properties=None, body=None):
        assert isinstance(sequence, int)
        self.sequence = sequence
        if uuid is None:
            uuid = uuid4().bytes
        self.uuid = uuid
        self.key = key
        self.properties = {} if properties is None else properties
        self.body = body

    # dictionary access maps to properties:
    def __getitem__(self, k):
        return self.properties[k]

    def __setitem__(self, k, v):
        self.properties[k] = v

    def get(self, k, default=None):
        return self.properties.get(k, default)

    def store(self, dikt):
        """Store me in a dict if I have anything to store
        else delete me from the dict."""
        if self.key is not None and self.body is not None:
            dikt[self.key] = self
        elif self.key in dikt:
            del dikt[self.key]

    def send(self, socket):
        """Send key-value message to socket; any empty frames are sent as such."""
        key = b'' if self.key is None else self.key
        seq_s = struct.pack('!q', self.sequence)
        body = b'' if self.body is None else self.body
        prop_s = encode_properties(self.properties)
        socket.send_multipart([ key, seq_s, self.uuid, prop_s, body ])

    @classmethod
    def recv(cls, socket):
        """Reads key-value message from socket, returns new kvmsg instance."""
        return cls.from_msg(socket.recv_multipart())

    @classmethod
    def from_msg(cls, msg):
        """Construct key-value message from a multipart message"""
        key, seq_s, uuid, prop_s, body = msg
        key = key if key else None
        seq = struct.unpack('!q',seq_s)[0]
        body = body if body else None
        prop = decode_properties(prop_s)
        return cls(seq, uuid=uuid, key=key, properties=prop, body=body)

    def __repr__(self):
        if self.body is None:
            size = 0
            data=b'NULL'
        else:
            size = len(self.body)
            data = repr(self.body)

        mstr = "[seq:{seq}][key:{key}][size:{size}][props:{props}][data:{data}]".format(
            seq=self.sequence,
            # uuid=hexlify(self.uuid),
            key=self.key,
            size=size,
            props=encode_properties(self.properties),
            data=data,
        )
        return mstr


    def dump(self):
        print("<<", str(self), ">>", file=sys.stderr)
# ---------------------------------------------------------------------
# Runs self test of class

def test_kvmsg (verbose):
    print(" * kvmsg: ", end='')

    # Prepare our context and sockets
    ctx = zmq.Context()
    output = ctx.socket(zmq.DEALER)
    output.bind("ipc://kvmsg_selftest.ipc")
    input = ctx.socket(zmq.DEALER)
    input.connect("ipc://kvmsg_selftest.ipc")

    kvmap = {}
    # Test send and receive of simple message
    kvmsg = KVMsg(1)
    kvmsg.key = b"key"
    kvmsg.body = b"body"
    if verbose:
        kvmsg.dump()
    kvmsg.send(output)
    kvmsg.store(kvmap)

    kvmsg2 = KVMsg.recv(input)
    if verbose:
        kvmsg2.dump()
    assert kvmsg2.key == b"key"
    kvmsg2.store(kvmap)

    assert len(kvmap) == 1 # shouldn't be different

    # test send/recv with properties:
    kvmsg = KVMsg(2, key=b"key", body=b"body")
    kvmsg[b"prop1"] = b"value1"
    kvmsg[b"prop2"] = b"value2"
    kvmsg[b"prop3"] = b"value3"
    assert kvmsg[b"prop1"] == b"value1"
    if verbose:
        kvmsg.dump()
    kvmsg.send(output)
    kvmsg2 = KVMsg.recv(input)
    if verbose:
        kvmsg2.dump()
    # ensure properties were preserved
    assert kvmsg2.key == kvmsg.key
    assert kvmsg2.body == kvmsg.body
    assert kvmsg2.properties == kvmsg.properties
    assert kvmsg2[b"prop2"] == kvmsg[b"prop2"]

    print("OK")

if __name__ == '__main__':
    test_kvmsg('-v' in sys.argv)

7、使用反应器
	使用反应器使得代码更加冗长,但更容易理解和构建,因为服务器的每一个部件都由一个单独的反应器处理程序。
	存在三个反应器处理程序:
•	一个用于处理在ROUTER套接字上传入的快照请求。
•	一个用于处理来自客户端的更新,在PULL套接字上传入。
•	一个用于将已超过其TTL的临时值置为过期。
示例代码克隆服务器,模型五
"""
Clone server Model Five

Author: Min RK <benjaminrk@gmail.com
"""

import logging
import time

import zmq
from zmq.eventloop.ioloop import IOLoop, PeriodicCallback
from zmq.eventloop.zmqstream import ZMQStream

from kvmsg import KVMsg
from zhelpers import dump

# simple struct for routing information for a key-value snapshot
class Route:
    def __init__(self, socket, identity, subtree):
        self.socket = socket        # ROUTER socket to send to
        self.identity = identity    # Identity of peer who requested state
        self.subtree = subtree      # Client subtree specification


def send_single(key, kvmsg, route):
    """Send one state snapshot key-value pair to a socket"""
    # check front of key against subscription subtree:
    if kvmsg.key.startswith(route.subtree):
        # Send identity of recipient first
        route.socket.send(route.identity, zmq.SNDMORE)
        kvmsg.send(route.socket)

class CloneServer(object):

    # Our server is defined by these properties
    ctx = None                  # Context wrapper
    kvmap = None                # Key-value store
    loop = None                 # IOLoop reactor
    port = None                 # Main port we're working on
    sequence = 0                # How many updates we're at
    snapshot = None             # Handle snapshot requests
    publisher = None            # Publish updates to clients
    collector = None            # Collect updates from clients

    def __init__(self, port=5556):
        self.port = port
        self.ctx = zmq.Context()
        self.kvmap = {}
        self.loop = IOLoop.instance()

        # Set up our clone server sockets
        self.snapshot  = self.ctx.socket(zmq.ROUTER)
        self.publisher = self.ctx.socket(zmq.PUB)
        self.collector = self.ctx.socket(zmq.PULL)
        self.snapshot.bind("tcp://*:%d" % self.port)
        self.publisher.bind("tcp://*:%d" % (self.port + 1))
        self.collector.bind("tcp://*:%d" % (self.port + 2))

        # Wrap sockets in ZMQStreams for IOLoop handlers
        self.snapshot = ZMQStream(self.snapshot)
        self.publisher = ZMQStream(self.publisher)
        self.collector = ZMQStream(self.collector)

        # Register our handlers with reactor
        self.snapshot.on_recv(self.handle_snapshot)
        self.collector.on_recv(self.handle_collect)
        self.flush_callback = PeriodicCallback(self.flush_ttl, 1000)

        # basic log formatting:
        logging.basicConfig(format="%(asctime)s %(message)s", datefmt="%Y-%m-%d %H:%M:%S",
                level=logging.INFO)


    def start(self):
        # Run reactor until process interrupted
        self.flush_callback.start()
        try:
            self.loop.start()
        except KeyboardInterrupt:
            pass

    def handle_snapshot(self, msg):
        """snapshot requests"""
        if len(msg) != 3 or msg[1] != b"ICANHAZ?":
            print("E: bad request, aborting")
            dump(msg)
            self.loop.stop()
            return
        identity, request, subtree = msg
        if subtree:
            # Send state snapshot to client
            route = Route(self.snapshot, identity, subtree)

            # For each entry in kvmap, send kvmsg to client
            for k,v in self.kvmap.items():
                send_single(k,v,route)

            # Now send END message with sequence number
            logging.info("I: Sending state shapshot=%d" % self.sequence)
            self.snapshot.send(identity, zmq.SNDMORE)
            kvmsg = KVMsg(self.sequence)
            kvmsg.key = b"KTHXBAI"
            kvmsg.body = subtree
            kvmsg.send(self.snapshot)

    def handle_collect(self, msg):
        """Collect updates from clients"""
        kvmsg = KVMsg.from_msg(msg)
        self.sequence += 1
        kvmsg.sequence = self.sequence
        kvmsg.send(self.publisher)
        ttl = float(kvmsg.get(b'ttl', 0))
        if ttl:
            kvmsg[b'ttl'] = b'%f' % (time.time() + ttl)
        kvmsg.store(self.kvmap)
        logging.info("I: publishing update=%d", self.sequence)

    def flush_ttl(self):
        """Purge ephemeral values that have expired"""
        for key,kvmsg in list(self.kvmap.items()):
            # used list() to exhaust the iterator before deleting from the dict
            self.flush_single(kvmsg)

    def flush_single(self, kvmsg):
        """If key-value pair has expired, delete it and publish the fact
        to listening clients."""
        ttl = float(kvmsg.get(b'ttl', 0))
        if ttl and ttl <= time.time():
            kvmsg.body = b""
            self.sequence += 1
            kvmsg.sequence = self.sequence
            kvmsg.send(self.publisher)
            del self.kvmap[kvmsg.key]
            logging.info("I: publishing delete=%d", self.sequence)

def main():
    clone = CloneServer()
    clone.start()

if __name__ == '__main__':
    main()

8、在双型模式中添加可靠性
	模型六在模型五的基础上引入了以下更改:
•	使用一个发布-订阅流,而不是一个推送-提取流来把客户端的更新发送到服务器。 这负责同时扇出更新到两台服务器。否则,我们不得不使用两个DEALER套接字。
•	把信号检测增加到服务器的更新中(对客户端),使得客户端可以检测到主服务器已 经死机这种情况。然后,它可以切换到备份服务器。
•	使用双星反应器类连接两台服务器。双星依赖于客户端通过做出明确的请求 到它们认为“活动”的服务器来“投票”,我们将使用快照请求作为表决机制。
•	通过添加一个UU1D字段使得所有更新消息都可唯一辨认。客户端生成此字段,而 服务器在重新发布的更新中将它传播回来。
•	被动服务器保留更新的“待定清单”,该清单保存的是它已经从客户端收到但尚未从 活动服务器收到的更新,以及它从活动服务器收到,但尚未从客户端收到的更新。 该清单是从旧到新排序的,所以很容易移除最前面的更新。
示例代码克隆服务器,模型六
"""
Clone server Model Six

Author: Min RK <benjaminrk@gmail.com
"""

import logging
import time

import zmq
from zmq.eventloop.ioloop import PeriodicCallback
from zmq.eventloop.zmqstream import ZMQStream

from bstar import BinaryStar
from kvmsg import KVMsg
from zhelpers import dump

# simple struct for routing information for a key-value snapshot
class Route:
    def __init__(self, socket, identity, subtree):
        self.socket = socket        # ROUTER socket to send to
        self.identity = identity    # Identity of peer who requested state
        self.subtree = subtree      # Client subtree specification


def send_single(key, kvmsg, route):
    """Send one state snapshot key-value pair to a socket"""
    # check front of key against subscription subtree:
    if kvmsg.key.startswith(route.subtree):
        # Send identity of recipient first
        route.socket.send(route.identity, zmq.SNDMORE)
        kvmsg.send(route.socket)

class CloneServer(object):

    # Our server is defined by these properties
    ctx = None                  # Context wrapper
    kvmap = None                # Key-value store
    bstar = None                # Binary Star
    sequence = 0                # How many updates so far
    port = None                 # Main port we're working on
    peer = None                 # Main port of our peer
    publisher = None            # Publish updates and hugz
    collector = None            # Collect updates from clients
    subscriber = None           # Get updates from peer
    pending = None              # Pending updates from client
    primary = False             # True if we're primary
    master = False              # True if we're master
    slave = False               # True if we're slave

    def __init__(self, primary=True, ports=(5556,5566)):
        self.primary = primary
        if primary:
            self.port, self.peer = ports
            frontend = "tcp://*:5003"
            backend  = "tcp://localhost:5004"
            self.kvmap = {}
        else:
            self.peer, self.port = ports
            frontend = "tcp://*:5004"
            backend  = "tcp://localhost:5003"

        self.ctx = zmq.Context.instance()
        self.pending = []
        self.bstar = BinaryStar(primary, frontend, backend)

        self.bstar.register_voter("tcp://*:%i" % self.port, zmq.ROUTER, self.handle_snapshot)

        # Set up our clone server sockets
        self.publisher = self.ctx.socket(zmq.PUB)
        self.collector = self.ctx.socket(zmq.SUB)
        self.collector.setsockopt(zmq.SUBSCRIBE, b'')
        self.publisher.bind("tcp://*:%d" % (self.port + 1))
        self.collector.bind("tcp://*:%d" % (self.port + 2))

        # Set up our own clone client interface to peer
        self.subscriber = self.ctx.socket(zmq.SUB)
        self.subscriber.setsockopt(zmq.SUBSCRIBE, b'')
        self.subscriber.connect("tcp://localhost:%d" % (self.peer + 1))

        # Register state change handlers
        self.bstar.master_callback = self.become_master
        self.bstar.slave_callback = self.become_slave

        # Wrap sockets in ZMQStreams for IOLoop handlers
        self.publisher = ZMQStream(self.publisher)
        self.subscriber = ZMQStream(self.subscriber)
        self.collector = ZMQStream(self.collector)

        # Register our handlers with reactor
        self.collector.on_recv(self.handle_collect)
        self.flush_callback = PeriodicCallback(self.flush_ttl, 1000)
        self.hugz_callback = PeriodicCallback(self.send_hugz, 1000)

        # basic log formatting:
        logging.basicConfig(format="%(asctime)s %(message)s", datefmt="%Y-%m-%d %H:%M:%S",
                level=logging.INFO)

    def start(self):
        # start periodic callbacks
        self.flush_callback.start()
        self.hugz_callback.start()
        # Run bstar reactor until process interrupted
        try:
            self.bstar.start()
        except KeyboardInterrupt:
            pass

    def handle_snapshot(self, socket, msg):
        """snapshot requests"""
        if msg[1] != b"ICANHAZ?" or len(msg) != 3:
            logging.error("E: bad request, aborting")
            dump(msg)
            self.bstar.loop.stop()
            return
        identity, request = msg[:2]
        if len(msg) >= 3:
            subtree = msg[2]
            # Send state snapshot to client
            route = Route(socket, identity, subtree)

            # For each entry in kvmap, send kvmsg to client
            for k,v in self.kvmap.items():
                send_single(k,v,route)

            # Now send END message with sequence number
            logging.info("I: Sending state shapshot=%d" % self.sequence)
            socket.send(identity, zmq.SNDMORE)
            kvmsg = KVMsg(self.sequence)
            kvmsg.key = b"KTHXBAI"
            kvmsg.body = subtree
            kvmsg.send(socket)

    def handle_collect(self, msg):
        """Collect updates from clients

        If we're master, we apply these to the kvmap
        If we're slave, or unsure, we queue them on our pending list
        """
        kvmsg = KVMsg.from_msg(msg)
        if self.master:
            self.sequence += 1
            kvmsg.sequence = self.sequence
            kvmsg.send(self.publisher)
            ttl = float(kvmsg.get(b'ttl', 0))
            if ttl:
                kvmsg[b'ttl'] = b'%f' % (time.time() + ttl)
            kvmsg.store(self.kvmap)
            logging.info("I: publishing update=%d", self.sequence)
        else:
            # If we already got message from master, drop it, else
            # hold on pending list
            if not self.was_pending(kvmsg):
                self.pending.append(kvmsg)

    def was_pending(self, kvmsg):
        """If message was already on pending list, remove and return True.
        Else return False.
        """
        found = False
        for idx, held in enumerate(self.pending):
            if held.uuid == kvmsg.uuid:
                found = True
                break
        if found:
            self.pending.pop(idx)
        return found

    def flush_ttl(self):
        """Purge ephemeral values that have expired"""
        if self.kvmap:
            for key,kvmsg in list(self.kvmap.items()):
                self.flush_single(kvmsg)

    def flush_single(self, kvmsg):
        """If key-value pair has expired, delete it and publish the fact
        to listening clients."""
        ttl = float(kvmsg.get(b'ttl', 0))
        if ttl and ttl <= time.time():
            kvmsg.body = b""
            self.sequence += 1
            kvmsg.sequence = self.sequence
            logging.info("I: preparing to publish delete=%s", kvmsg.properties)
            kvmsg.send(self.publisher)
            del self.kvmap[kvmsg.key]
            logging.info("I: publishing delete=%d", self.sequence)

    def send_hugz(self):
        """Send hugz to anyone listening on the publisher socket"""
        kvmsg = KVMsg(self.sequence)
        kvmsg.key = b"HUGZ"
        kvmsg.body = b""
        kvmsg.send(self.publisher)

    # ---------------------------------------------------------------------
    # State change handlers

    def become_master(self):
        """We're becoming master

        The backup server applies its pending list to its own hash table,
        and then starts to process state snapshot requests.
        """
        self.master = True
        self.slave = False
        # stop receiving subscriber updates while we are master
        self.subscriber.stop_on_recv()

        # Apply pending list to own kvmap
        while self.pending:
            kvmsg = self.pending.pop(0)
            self.sequence += 1
            kvmsg.sequence = self.sequence
            kvmsg.store(self.kvmap)
            logging.info ("I: publishing pending=%d", self.sequence)


    def become_slave(self):
        """We're becoming slave"""
        # clear kvmap
        self.kvmap = None
        self.master = False
        self.slave = True
        self.subscriber.on_recv(self.handle_subscriber)

    def handle_subscriber(self, msg):
        """Collect updates from peer (master)
        We're always slave when we get these updates
        """
        if self.master:
            logging.warn("received subscriber message, but we are master %s", msg)
            return

        # Get state snapshot if necessary
        if self.kvmap is None:
            self.kvmap = {}
            snapshot = self.ctx.socket(zmq.DEALER)
            snapshot.linger = 0
            snapshot.connect("tcp://localhost:%i" % self.peer)

            logging.info ("I: asking for snapshot from: tcp://localhost:%d",
                        self.peer)
            snapshot.send_multipart([b"ICANHAZ?", b''])
            while True:
                try:
                    kvmsg = KVMsg.recv(snapshot)
                except KeyboardInterrupt:
                    # Interrupted
                    self.bstar.loop.stop()
                    return
                if kvmsg.key == b"KTHXBAI":
                    self.sequence = kvmsg.sequence
                    break          # Done
                kvmsg.store(self.kvmap)

            logging.info ("I: received snapshot=%d", self.sequence)

        # Find and remove update off pending list
        kvmsg = KVMsg.from_msg(msg)
        # update float ttl -> timestamp
        ttl = float(kvmsg.get(b'ttl', 0))
        if ttl:
            kvmsg[b'ttl'] = b'%f' % (time.time() + ttl)

        if kvmsg.key != b"HUGZ":
            if not self.was_pending(kvmsg):
                # If master update came before client update, flip it
                # around, store master update (with sequence) on pending
                # list and use to clear client update when it comes later
                self.pending.append(kvmsg)

            # If update is more recent than our kvmap, apply it
            if (kvmsg.sequence > self.sequence):
                self.sequence = kvmsg.sequence
                kvmsg.store(self.kvmap)
                logging.info ("I: received update=%d", self.sequence)


def main():
    import sys
    if '-p' in sys.argv:
        primary = True
    elif '-b' in sys.argv:
        primary = False
    else:
        print("Usage: clonesrv6.py { -p | -b }")
        sys.exit(1)
    clone = CloneServer(primary)
    clone.start()

if __name__ == '__main__':
    main()
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

ZeroMQ学习笔记(5)——高级发布订阅模式 的相关文章

  • 无需虚拟机或双系统!Windows下愉快地使用Linux子系统

    文章目录 说明一 启用 适用于Linux的Windows子系统 WSL 二 启用开发人员模式三 下载linux安装包相关深入学习Ubuntu子系统的资料参考资料 本文阅读3分钟 xff0c 你将获得如下方案 xff0c 在轻量使用场景下 x
  • 解决GitHub的raw.githubusercontent.com无法连接问题

    问题描述 xff1a Ubuntu下连接raw githubusercontent com失败 wget https raw githubusercontent com madmashup targeted marketing predic
  • k2pdfopt详细教程-让kindle看遍所有pdf

    pdf拿什么拯救6寸kindle救世主登场一步一步解决图文混排扫描版pdf 书籍总结 pdf xff0c 拿什么拯救6寸kindle kindle现在已经出道paperwhite第三代了 xff08 2015年7月 xff09 xff0c
  • 图像处理PSNR及其计算(OpenCV和matlab实现)

    图像PSNR及其计算OpenCV和matlab实现 PSNR的概念PSNR编程实现 matlab实现 第一种实现方法第二种直观方法第三种实现方法 OpenCV实现 参考资料 图像PSNR及其计算 xff08 OpenCV和matlab实现
  • Caffe技巧之使用snapshot来继续网络训练

    Caffe技巧之使用snapshot来继续网络训练 Caffe技巧之使用snapshot来继续网络训练 Step 1设置solverprototxtStep 2设置运行脚本sh 有时候想在已经训练好的网络上继续之前的训练 xff0c 那么可
  • 鸿蒙最新功能及承载设备详解:HarmonyOS 2及华为全场景新品发布会全纪录

    6月2日 xff0c 华为联手CSDN直播了 HarmonyOS 2及华为全场景新品发布会 xff0c 老猿全程观看直播 xff0c 并进行了回看 xff0c 力争将发布会的核心内容在本文中概要性地呈现 一 一生万物 万物归一 首先是华为消
  • python删除网页html元素

    找到标签id 以id来删除 js span class token operator 61 span span class token string 39 var child 61 document getElementById 34 ex
  • 10.面向对象分析OOA笔记

    文章目录 概述需求陈述建立对象模型典型步骤 建立动态模型典型步骤 建立功能模型数据流图画法 定义服务 概述 识别出问题域内的类和对象 xff0c 分析它们之间的关系 xff0c 建立问题域的正确模型 三种模型中 xff0c 对象模型是最重要
  • 12.软件项目管理笔记

    文章目录 估算软件规模代码行技术KLOC功能点技术FP 估算工作量进度计划人员组织质量保证软件配置管理能力成熟度模型CMM 估算软件规模 代码行技术KLOC xff08 最小规模平均值a 43 4 最可能规模平均值 43 最大规模平均值b
  • 0.各种规格描述技术总结

    文章目录 结构化分析与设计面向对象分析与设计软件项目管理 结构化分析与设计 系统流程图 xff1a 描绘物理系统 E R图 xff1a 数据模型 层次方框图 xff1a 描绘数据结构 xff0c 数据模型 Warnier图 xff1a 描绘
  • 如何更改Git的端口号

    方法一 直接修改URL为SSH 开头 打开gitbash xff0c 进入仓库 xff0c 输入指令 xff1a git remote set url origin ssh git 64 domain com 1234 home git Y
  • vtk多平面重建(MPR)源码

    include 34 vtkSmartPointer h 34 include 34 vtkActor h 34 include 34 vtkCamera h 34 include 34 vtkCellPicker h 34 include
  • 0.1 + 0.2 不等于0.3 问题,精度的丢失和解决办法

    10个0 1相加不等于1 xff1b 这是因为浮点数精度丢失的问题 xff0c 首先知道在计算机中数字是以二进制的方式存在的 xff0c 那么在CPU中计算0 1 43 0 1时实际上是0 1的二进制的相加 xff1b xff08 0 1
  • Raspberrypi 3 系统备份还原, 基于最小系统镜像实现

    Raspberrypi 3 备份还原系统 一 为什么要备份系统 xff1f 1 经常在树莓派上调试程序 xff0c 安装各种软件 xff0c 越来越多的库和程序的安装带来的系统更改几乎是不可逆的 xff0c 一旦某个程序或者驱动出现问题 x
  • OpenStack(Kilo) + Tenant-OVS-VXLAN(ml2) + Multi-Ext-Net

    from http blog sina com cn s blog 6de3aa8a0102vl7m html 使用VirualBox创建CentOS7虚拟机 资源分配视宿主windows而定 xff0c 由于要部署OpenStack xf
  • 利用策略模式优化if-else

    一 定义 策略模式 Strategy Pattern 策略模式属于对象的行为模式 其用意是针对一组算法 xff0c 将每一个算法封装到具有共同接口的独立的类中 xff0c 从而使得它们可以相互替换 策略模式使得算法可以在不影响到客户端的情况
  • web技术分享| 【高德地图】实现自定义的轨迹回放

    实现 轨迹回放 方式有两种 xff1a 第一种是使用 JS API 和 AMap PolyLine xff08 折线 xff09 等图形配合实现 第二种是使用 JS API 和 AMapUI 组件库 配合使用 xff0c 利用 PathSi
  • ubuntu14.04 忘记了普通用户密码和root密码

    步骤一 xff1a 必须先找回ROOT xff0c 才可以往下做 本文使用的Ubuntu版本为14 04 4 xff0c 具体过程如下为 xff1a 1 重启电脑长按shift键直到进入下图进入GRUB引导模式 xff0c 选择第二行Ubu
  • 乐优商城介绍

    1 乐优商城介绍 1 1 项目介绍 乐优商城是一个全品类的电商购物网站 xff08 B2C xff09 用户可以在线购买商品 加入购物车 下单 秒杀商品可以品论已购买商品管理员可以在后台管理商品的上下架 促销活动管理员可以监控商品销售状况客
  • 使用java中replaceAll方法替换字符串中的反斜杠

    今天在项目中使用java中replaceAll方法将字符串中的反斜杠 34 34 替换成空字符串 34 34 xff0c 结果出现如下的异常 xff1a 1 java util regex PatternSyntaxException Un

随机推荐

  • 正则表达式匹配引号中间的内容怎么写?

    字符串 123 abc 456 匹配结果 abc Answer1 利用先行和后发断言规则 xff1a lt 61 34 61 34 最近总结了一篇关于正则表达式的博文 xff0c 题主不妨一读 xff1a 正则表达式基础 测试代码如下 xf
  • excel将一个工作表根据条件拆分成多个工作表图文教程

    本例介绍在excel中如何将一个工作表根据条件拆分成多个工作表 注意 xff1a 很多朋友反映sheets i delete这句代码出错 xff0c 要注意下面第一个步骤 xff0c 要拆分的数据工作表名称为 数据源 xff0c 而不是你新
  • python3下cv2.imwrite存储带有中文路径

    由于imwrite前使用编码在python3中已经不适用 xff0c 可用imencode代替 xff0c 以下代码是从视频中获取第2帧保存在中文文件夹下的实例 xff1a cap 61 cv2 VideoCapture 34 mp4 34
  • tf.placeholder、feed_dict用法说明

    函数形式 xff1a tf placeholder dtype shape 61 None name 61 None 参数 xff1a dtype xff1a 数据类型 常用的是tf float32 tf float64等数值类型 shap
  • sess.run()

    函数 xff1a run fetches feed dict 61 None options 61 None run metadata 61 None 当构建完图后 xff0c 需要在一个session会话中启动图 xff0c 第一步是创建
  • 卷积神经网络之AlexNet网络详解

    一 介绍 Alex Krizhevsky等人训练了一个大型的卷积神经网络用来把ImageNet LSVRC 2010比赛中120万张高分辨率的图像分为1000个不同的类别 在测试卷上 xff0c 获得很高准确率 top 1 and top
  • 如何分清分布式、高并发与多线程

    当提起这三个词的时候 xff0c 是不是很多人都认为分布式 61 高并发 61 多线程 xff1f 当面试官问到高并发系统可以采用哪些手段来解决 xff0c 或者被问到分布式系统如何解决一致性的问题 xff0c 是不是一脸懵逼 xff1f
  • 利用正则表达式排除特定字符串

    查找不以baidu开头的字符串 baidu com sina com cn 正则 xff1a baidu 匹配结果就是第2行 xff0c 也就是第1行被排除了 这里使用了零宽度断言 exp 注意 xff0c 我们有一个向前查找的语法 也叫顺
  • 自学记录--python小知识

    os path 的一些功能 根据实际项目中的例子来理解一下大体的用法 xff0c 目前只接触了几个方法 例1 xff1a 我是在c python django ttsx2 ttsx goods views py工作 xff0c 运行环境是在
  • django配置连接多个数据库,和把应用名字在admin后台显示为中文

    django配置连接多个数据库 xff0c 自定义表名称 在项目tt下新建两个app xff0c 分别为app01 app02 配置app01使用default节点数据库 xff1b app02使用hvdb节点数据库 xff08 也可以配置
  • 自学记录--字符串,列表,字典的常用方法

    字符串常见操作 如有字符串mystr 61 39 hello world itcast and itcastcpp 39 xff0c 以下是常见的操作 lt 1 gt find 检测 str 是否包含在 mystr中 xff0c 如果是返回
  • 自学记录--django模型使用记录

    对于重要数据都做逻辑删除 xff0c 不做物理删除 xff0c 实现方法是定义isDelete字段 xff0c 类型为BooleanField 默认值为False 字段类型概括 AutoField xff1a 一个根据实际ID自动增长的In
  • 自学记录--django模板使用记录

    模板template相关知识及问题 xff1a 过滤器 xff1a value floatformat gt 不给参数的话会将浮点数的小数位舍入到一个小数位 例 xff1a value 61 34 256 gt 结果为34 3 value
  • 自学记录--django+uwsgi+nginx部署

    一 xff1a 服务器环境配置 在本地的虚拟环境中 xff0c 项目根目录下 xff0c 执行命令收集所有包 pip freeze gt plist txt 通过xftp软件将开发好的项目和收集的包上传到服务器某个目录在服务器上面安装并创建
  • 赛码-三分线-java

    题目描述 小赛很喜欢看A队和B队的篮球比赛 众所周知 xff0c 篮球每回合根据投篮远近可以得2分或3分 如果投篮距离小于d那么得2分 xff0c 大于等于d得3分 我们将d记为三分线 每次小赛都喜欢通过改变三分线的大小来让自己支持的A队获
  • xrdp远程登录恢复上一次登陆会话

    本文参考Xrdp Tip How to reconnect to the existing session while using the xrdp package from Ubuntu Repository 首先连接远程桌面是观察连接时
  • 查看Debian版本

    查看Debian版本信息命令如下 xff1a lsb release a 参考运行截图 xff1a
  • Linux 服务器安装配置 TimeMachine

    Linux 服务器安装配置 TimeMachine 1 安装 Time Machine 相关的后台服务 1 安装netatalk xff1a apt get install netatalk 2 安装 dbus xff1a apt get
  • Linux命令总结--特殊符号命令

    Linux中特殊符号大全 井号 comments 管理员 普通用户 脚本中 bin bash bin sh 井号也常出现在一行的开头 xff0c 或者位于完整指令之后 xff0c 这类情况表示符号后面的是注解文字 xff0c 不会被执行 T
  • ZeroMQ学习笔记(5)——高级发布订阅模式

    第五章 高级发布订阅模式 何时使用发布 订阅模式 如何处理过于慢速的订阅者 xff08 自杀蜗牛模式 xff09 如何设计高速订阅者 xff08 黑盒模式 xff09 如何监控一个发布 订阅网络 xff08 特浓咖啡模式 xff09 如何建