队列上的多路复用。队列?

2024-01-04

我怎样才能“选择”多个queue.Queue http://docs.python.org/py3k/library/queue.html是同时的吗?

Go 语言有所需的功能 http://golang.org/doc/go_spec.html#Select_statements及其渠道:

select {
case i1 = <-c1:
    print("received ", i1, " from c1\n")
case c2 <- i2:
    print("sent ", i2, " to c2\n")
case i3, ok := (<-c3):  // same as: i3, ok := <-c3
    if ok {
        print("received ", i3, " from c3\n")
    } else {
        print("c3 is closed\n")
    }
default:
    print("no communication\n")
}

其中第一个解除阻塞的通道执行相应的块。我如何在 Python 中实现这一点?

Update0

Per the link http://www.1024cores.net/home/lock-free-algorithms/queues给出tux21b 的回答 https://stackoverflow.com/a/8539910/149482,所需的队列类型具有以下属性:

  • 多生产者/多消费者队列(MPMC)
  • 提供每个生产者 FIFO/LIFO
  • 当队列为空/已满时,消费者/生产者会被阻塞

此外,通道可能会阻塞,生产者将阻塞,直到消费者检索该项目。我不确定Python的队列可以做到这一点。


如果你使用queue.PriorityQueue您可以使用通道对象作为优先级来获得类似的行为:

import threading, logging
import random, string, time
from queue import PriorityQueue, Empty
from contextlib import contextmanager

logging.basicConfig(level=logging.NOTSET,
                    format="%(threadName)s - %(message)s")

class ChannelManager(object):
    next_priority = 0

    def __init__(self):
        self.queue = PriorityQueue()
        self.channels = []

    def put(self, channel, item, *args, **kwargs):
        self.queue.put((channel, item), *args, **kwargs)

    def get(self, *args, **kwargs):
        return self.queue.get(*args, **kwargs)

    @contextmanager
    def select(self, ordering=None, default=False):
        if default:
            try:
                channel, item = self.get(block=False)
            except Empty:
                channel = 'default'
                item = None
        else:
            channel, item = self.get()
        yield channel, item


    def new_channel(self, name):
        channel = Channel(name, self.next_priority, self)
        self.channels.append(channel)
        self.next_priority += 1
        return channel


class Channel(object):
    def __init__(self, name, priority, manager):
        self.name = name
        self.priority = priority
        self.manager = manager

    def __str__(self):
        return self.name

    def __lt__(self, other):
        return self.priority < other.priority

    def put(self, item):
        self.manager.put(self, item)


if __name__ == '__main__':
    num_channels = 3
    num_producers = 4
    num_items_per_producer = 2
    num_consumers = 3
    num_items_per_consumer = 3

    manager = ChannelManager()
    channels = [manager.new_channel('Channel#{0}'.format(i))
                for i in range(num_channels)]

    def producer_target():
        for i in range(num_items_per_producer):
            time.sleep(random.random())
            channel = random.choice(channels)
            message = random.choice(string.ascii_letters)
            logging.info('Putting {0} in {1}'.format(message, channel))
            channel.put(message)

    producers = [threading.Thread(target=producer_target,
                                  name='Producer#{0}'.format(i))
                 for i in range(num_producers)]
    for producer in producers:
        producer.start()
    for producer in producers:
        producer.join()
    logging.info('Producers finished')

    def consumer_target():
        for i in range(num_items_per_consumer):
            time.sleep(random.random())
            with manager.select(default=True) as (channel, item):
                if channel:
                    logging.info('Received {0} from {1}'.format(item, channel))
                else:
                    logging.info('No data received')

    consumers = [threading.Thread(target=consumer_target,
                                  name='Consumer#{0}'.format(i))
                 for i in range(num_consumers)]
    for consumer in consumers:
        consumer.start()
    for consumer in consumers:
        consumer.join()
    logging.info('Consumers finished')

输出示例:

Producer#0 - Putting x in Channel#2
Producer#2 - Putting l in Channel#0
Producer#2 - Putting A in Channel#2
Producer#3 - Putting c in Channel#0
Producer#3 - Putting z in Channel#1
Producer#1 - Putting I in Channel#1
Producer#1 - Putting L in Channel#1
Producer#0 - Putting g in Channel#1
MainThread - Producers finished
Consumer#1 - Received c from Channel#0
Consumer#2 - Received l from Channel#0
Consumer#0 - Received I from Channel#1
Consumer#0 - Received L from Channel#1
Consumer#2 - Received g from Channel#1
Consumer#1 - Received z from Channel#1
Consumer#0 - Received A from Channel#2
Consumer#1 - Received x from Channel#2
Consumer#2 - Received None from default
MainThread - Consumers finished

在这个例子中,ChannelManager只是一个包装queue.PriorityQueue实现了select方法作为contextmanager使其看起来类似于selectGo 中的声明。

有几点需要注意:

  • Ordering

    • 在 Go 示例中,通道在内部的写入顺序select语句确定如果有多个通道可用的数据,将执行哪个通道的代码。

    • 在 python 示例中,顺序由分配给每个通道的优先级决定。然而,优先级可以动态分配给每个通道(如示例所示),因此可以使用更复杂的方式更改顺序select方法负责根据方法的参数分配新的优先级。此外,一旦上下文管理器完成,就可以重新建立旧的排序。

  • Blocking

    • 在 Go 示例中,select语句阻塞,如果default案例存在。

    • 在 python 示例中,必须将布尔参数传递给select方法来明确何时需要阻塞/非阻塞。在非阻塞情况下,上下文管理器返回的通道只是字符串'default'所以很容易在里面的代码中检测到这一点with陈述。

  • 线程:对象中queue模块已经准备好用于多生产者、多消费者场景,如示例中所示。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

队列上的多路复用。队列? 的相关文章

随机推荐