我有一个类,我希望通过 python 中的 SimpleXMLRPCServer 进行测试。我设置单元测试的方法是创建一个新线程,并在其中启动 SimpleXMLRPCServer。然后我运行所有测试,最后关闭。
这是我的服务器线程:
class ServerThread(Thread):
running = True
def run(self):
self.server = #Creates and starts SimpleXMLRPCServer
while (self.running):
self.server.handle_request()
def stop(self):
self.running = False
self.server.server_close()
问题是,如果线程已经在等待handle_request中的请求,那么调用ServerThread.stop(),然后调用Thread.stop()和Thread.join()将不会导致线程正确停止。而且由于这里似乎没有任何我可以使用的中断或超时机制,因此我不知道如何干净地关闭服务器线程。
我遇到了同样的问题,经过几个小时的研究,我通过从使用自己的解决方案切换来解决了这个问题处理请求()循环到永远服务()启动服务器。
永远服务()启动像你一样的内部循环。这个循环可以通过调用来停止关闭()。停止循环后可以停止服务器服务器关闭().
我不知道为什么这会起作用处理请求()循环不,但它确实;P
这是我的代码:
from threading import Thread
from xmlrpc.server import SimpleXMLRPCServer
from pyWebService.server.service.WebServiceRequestHandler import WebServiceRquestHandler
class WebServiceServer(Thread):
def __init__(self, ip, port):
super(WebServiceServer, self).__init__()
self.running = True
self.server = SimpleXMLRPCServer((ip, port),requestHandler=WebServiceRquestHandler)
self.server.register_introspection_functions()
def register_function(self, function):
self.server.register_function(function)
def run(self):
self.server.serve_forever()
def stop_server(self):
self.server.shutdown()
self.server.server_close()
print("starting server")
webService = WebServiceServer("localhost", 8010)
webService.start()
print("stopping server")
webService.stop_server()
webService.join()
print("server stopped")
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)