在 multiprocessing.connection.Listener.accept() 给定时间后引发 TimeOutError

2024-04-28

我正试图打断multiprocessing.connection.Listener.accept(),但迄今为止尚未成功。由于它不提供timeout参数,我想也许我可以使用socket.setdefaulttimeout()打断它,正如帖子中所建议的那样,我再也找不到了,在这里。

这不起作用。然后我尝试打电话close() on the Listener()目的。根据这篇文章的答案 https://stackoverflow.com/questions/25041967/python-interrupt-s-accept,这应该有效。

然而,这些物体似乎并没有与通常的物体一起发挥作用。socket相关解决方案。

我可以确认的是Listener被关闭的是Timer对象符合预期,但是accept()通话不中断。

代码:

import logging
import socket
import os
from multiprocessing.connection import Listener
from queue import Queue, Empty
from threading import Thread, Event, Timer

class Node(Thread):
    """Base Class providing a AF_INET, AF_UNIX or AF_PIPE connection to its
    data queue. It offers put() and get() method wrappers, and therefore
    behaves like a Queue as well as a Thread.

    Data from the internal queue is automatically fed to any connecting client.
    """
    def __init__(self, sock_name, max_q_size=None, timeout=None,
                 *thread_args, **thread_kwargs):
        """Initialize class.

        :param sock_name: UDS, TCP socket or pipe name
        :param max_q_size: maximum queue size for self.q, default infinite
        """
        self._sock_name = sock_name
        self.connector = Listener(sock_name)
        max_q_size = max_q_size if max_q_size else 0
        self.q = Queue(maxsize=max_q_size)
        self._running = Event()
        self.connection_timer = Timer(timeout, self.connection_timed_out)
        super(Node, self).__init__(*thread_args, **thread_kwargs)

    def connection_timed_out(self):
        """Closes the Listener and shuts down Node if no Client connected.

        :return:
        """
        self.connector.close()
        self.join()

    def _start_connection_timer(self):
        self.connection_timer.start()

    def start(self):
        self._running.set()
        super(Node, self).start()

    def join(self, timeout=None):
        print("clearing..")
        self._running.clear()
        print("internal join")
        super(Node, self).join(timeout=timeout)
        print("Done")

    def run(self):
        while self._running.is_set():
            print("Accepting connections..")
            self._start_connection_timer()
            try:
                client = self.connector.accept()
                self.connection_timer.cancel()
                self.feed_data(client)
            except (TimeoutError, socket.timeout):
                continue
            except Exception as e:
                raise
        print("Run() Terminated!")

    def feed_data(self, client):
        try:
            while self._running.is_set():
                try:
                    client.send(self.q.get())
                except Empty:
                    continue
        except EOFError:
            return


if __name__ == '__main__':
    import time
    n = Node('/home/nils/git/spab2/test.uds', timeout=10)
    n.start()
    print("Sleeping")
    time.sleep(15)
    print("Manual join")
    n.join()

我意识到我的问题与此重复question https://stackoverflow.com/questions/38936779/non-blocking-multiprocessing-connection-listener- 然而,它已经快一年了,甚至还没有收到评论。另外,我正在使用Unix Domain Sockets,与 linkedin 帖子相对TCP联系。


我设法通过以下方式设置超时Python 2.7:

self.listener = mpc.Listener((address, port))
self.listener._listener._socket.settimeout(3)

至此,accept()通话中断。

Result:

conn = self.listener.accept()
File "/usr/lib/python2.7/multiprocessing/connection.py", line 145, in accept
c = self._listener.accept()
File "/usr/lib/python2.7/multiprocessing/connection.py", line 275, in accept
s, self._last_accepted = self._socket.accept()
File "/usr/lib/python2.7/socket.py", line 202, in accept
sock, addr = self._sock.accept()
timeout: timed out

问候, 亨利

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

在 multiprocessing.connection.Listener.accept() 给定时间后引发 TimeOutError 的相关文章

随机推荐