如何在与主程序不同的线程中编写套接字服务器(使用 gevent)?

2023-12-12

我正在开发一个 Flask/gevent WSGIserver Web 服务器,它需要使用 XML 通过两个套接字与硬件设备进行通信(在后台)。

一个套接字由客户端(我的应用程序)启动,我可以向设备发送 XML 命令。设备在不同的端口上应答并发回我的应用程序必须确认的信息。所以我的应用程序必须监听第二个端口。

到目前为止,我已经发出命令,打开第二个端口作为服务器,等待设备的响应并关闭第二个端口。

问题是设备可能会发送多个我必须确认的响应。所以我的解决方案是保持端口打开并继续响应传入的请求。然而,最终设备完成发送请求,而我的应用程序仍在侦听(我不知道设备何时完成),从而阻止了其他一切。

这似乎是线程的完美用例,因此我的应用程序在单独的线程中启动侦听服务器。因为我已经使用 gevent 作为 Flask 的 WSGI 服务器,所以我可以使用 greenlet。

问题是,我一直在寻找此类事情的一个很好的示例,但我所能找到的只是单个套接字服务器的多线程处理程序的示例。我不需要处理套接字服务器上的大量连接,但我需要在单独的线程中启动它,以便它可以侦听和处理传入消息,同时我的主程序可以继续发送消息。 我遇到的第二个问题是在服务器中,我需要使用“主”类中的一些方法。作为 Python 的新手,我不确定如何构建它以使其成为可能。

class Device(object):

def __init__(self, ...):
    self.clientsocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    self.serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

def _connect_to_device(self):
    print "OPEN CONNECTION TO DEVICE"
    try:
        self.clientsocket.connect((self.ip, 5100))
    except socket.error as e:
        pass

def _disconnect_from_device(self):
    print "CLOSE CONNECTION TO DEVICE"
    self.clientsocket.close()

def deviceaction1(self, ...):
    # the data that is sent is an XML document that depends on the parameters of this method.
    self._connect_to_device()
    self._send_data(XMLdoc)
    self._wait_for_response()
    return True

def _send_data(self, data):
    print "SEND:"
    print(data)
    self.clientsocket.send(data)

def _wait_for_response(self):
    print "WAITING FOR REQUESTS FROM DEVICE (CHANNEL 1)"
    self.serversocket.bind(('10.0.0.16', 5102))
    self.serversocket.listen(5)                         # listen for answer, maximum 5 connections
    connection, address = self.serversocket.accept()
    # the data is of a specific length I can calculate
    if len(data) > 0:
        self._process_response(data)
        self.serversocket.close()

def _process_response(self, data):
    print "RECEIVED:"
    print(data)
    # here is some code that processes the incoming data and
    # responds to the device
    # this may or may not result in more incoming data



if __name__ == '__main__':
    machine = Device(ip="10.0.0.240")
    Device.deviceaction1(...)

这就是(在全球范围内,我遗漏了敏感信息)我现在正在做的事情。正如你所看到的,一切都是有顺序的。 如果任何人都可以提供一个单独线程中的侦听服务器的示例(最好使用 greenlet)以及从侦听服务器返回到生成线程的通信方式,这将有很大帮助。

Thanks.

EDIT:尝试了几种方法后,我决定使用Python默认的select()方法来解决这个问题。这有效,所以我关于线程使用的问题不再相关。感谢那些为您的时间和精力提供意见的人们。


希望它可以提供一些帮助,在示例类中如果我们会调用tenMessageSender函数然后它将启动一个异步线程而不阻塞主循环然后_zmqBasedListener将开始侦听单独的端口,直到该线程处于活动状态。以及我们的任何消息tenMessageSender函数将发送,这些将被客户端接收并响应zmqBasedListener.

服务器端

import threading
import zmq
import sys

class Example:
    def __init__(self):
        self.context = zmq.Context()
        self.publisher = self.context.socket(zmq.PUB)
        self.publisher.bind('tcp://127.0.0.1:9997')
        self.subscriber = self.context.socket(zmq.SUB)
        self.thread = threading.Thread(target=self._zmqBasedListener)

    def _zmqBasedListener(self):
        self.subscriber.connect('tcp://127.0.0.1:9998')
        self.subscriber.setsockopt(zmq.SUBSCRIBE, "some_key")
        while True:
            message = self.subscriber.recv()
            print message
        sys.exit()

    def tenMessageSender(self):
        self._decideListener()
        for message in range(10):
            self.publisher.send("testid : %d: I am a task" %message)

    def _decideListener(self):
        if not self.thread.is_alive():
            print "STARTING THREAD"
            self.thread.start()

Client

import zmq
context = zmq.Context()

subscriber = context.socket(zmq.SUB)
subscriber.connect('tcp://127.0.0.1:9997')
publisher = context.socket(zmq.PUB)
publisher.bind('tcp://127.0.0.1:9998')
subscriber.setsockopt(zmq.SUBSCRIBE, "testid")
count = 0
print "Listener"
while True:
    message = subscriber.recv()
    print message
    publisher.send('some_key : Message received %d' %count)
    count+=1

您可以使用 greenlet 等代替线程。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何在与主程序不同的线程中编写套接字服务器(使用 gevent)? 的相关文章

  • setColumnStretch 和 setRowStretch 如何工作

    我有一个使用构建的应用程序PySide2它使用setColumnStretch用于柱拉伸和setRowStretch用于行拉伸 它工作得很好 但我无法理解它是如何工作的 我参考了 qt 文档 但它对我没有帮助 我被困在括号内的两个值上 例如
  • Django 营业时间

    我想添加诊所的营业时间 我已经对此进行了调查在 Django 中实现 开放时间 的任何现有解决方案 https stackoverflow com questions 8128143 any existing solution to imp
  • 用于读取类似 CSV 行的 Python 正则表达式

    我想解析传入的类似 CSV 的数据行 值用逗号分隔 逗号周围可能有前导和尾随空格 并且可以用 或 引用 例如 这是有效的行 data1 data2 data3 data4 data5 但这是格式错误的 data1 data2 da ta3
  • TemplateSyntaxError:“settings_tags”不是有效的标签库

    当我尝试运行此测试用例时 出现此错误 这是在我的 django 应用程序的tests py 中编写的 def test accounts register self self url http royalflag com pk accoun
  • 如何搜索一列并用找到的内容填充另一列?

    我有一个带有虚构人物数据的大熊猫数据框 下面是一个小例子 每个人都由一个数字定义 import pandas as pd import numpy as np df pd DataFrame Number 5569 3385 9832 64
  • 让 python 脚本打印到终端而不作为标准输出的一部分返回

    我正在尝试编写一个返回值的 python 脚本 然后我可以将其传递给 bash 脚本 问题是我想要在 bash 中返回一个单一值 但我想要一些东西一路打印到终端 这是一个示例脚本 我们称之为 return5 py usr bin env p
  • 使用 Poetry 创建的 Python 项目:如何在 Visual Studio Code 中调试它?

    我有一个根据基本 Poetry 创建的 Python 项目指示 https python poetry org docs basic usage 项目文件夹是这样的 my project my project my project py F
  • 如何同时有效地运行多个 Pytorch 进程/模型? Traceback:分页文件太小,无法完成此操作

    背景 我有一个非常小的网络 我想用不同的随机种子进行测试 该网络几乎只使用了我的 GPU 计算能力的 1 因此理论上我可以同时运行 50 个进程来同时尝试许多不同的种子 Problem 不幸的是我什至无法在多个进程中导入 pytorch 当
  • 使用 pthread_cond_signal 优雅地终止线程被证明是有问题的

    我需要发射一堆线程 并希望优雅地将它们拉下来 我正在尝试使用pthread cond signal pthread cond wait实现这一目标 但遇到了问题 这是我的代码 首先是thread main static void thrma
  • python win32com.client 调整窗口大小

    我正在使用 Python 3 4 1 通过 win32com client 控制 Windows 应用程序 我可以激活它 我可以发送击键 点击等 现在我想知道是否有办法调整窗口大小并将其设置到特定位置 我找不到方法 这里有一些代码片段 所以
  • Flask 和 Reactjs 抛出 JSX 转换错误

    我已经开始将 ReactJS 与 Python Flask 后端结合使用 通过 Flask 渲染模板时 我在 Chrome 控制台中收到以下客户端错误 错误 找不到模块 jstransform visitors es6 templates
  • Python:处理图像并保存到文件流

    我需要使用 python 处理图像 应用过滤器和其他转换 然后使用 HTTP 将其提供给用户 现在 我正在使用 BaseHTTPServer 和 PIL 问题是 PIL 无法直接写入文件流 因此我必须写入临时文件 然后读取该文件 以便将其发
  • Parallel.ForEach - 优雅取消

    关于等待任务完成和线程同步的主题 我目前有一个迭代 我已将其包含在 Parallel ForEach 中 在下面的示例中 我在评论中提出了一些关于如何最好地处理循环的优雅终止的问题 NET 4 0 private void myFuncti
  • 哈希 freezeset 与排序元组

    在 Python 中 给定一组可比较的 可散列的元素s 散列是否更好frozenset s or tuple sorted s 这取决于你在做什么 创建一个更快frozenset 比排序tuple but frozenset占用的内存比tu
  • 在基本 Tensorflow 2.0 中运行简单回归

    我正在学习 Tensorflow 2 0 我认为在 Tensorflow 中实现最基本的简单线性回归是一个好主意 不幸的是 我遇到了几个问题 我想知道这里是否有人可以提供帮助 考虑以下设置 import tensorflow as tf 2
  • 我应该在哪里对对象和字段进行 django 验证?

    我正在创建一个 Django 应用程序 它使用 Django Rest Framework 和普通的 django views 作为用户的入口点 我想对模型的独立字段以及整个对象进行验证 例如 字段 根据正则表达式函数输入的车牌是否正确 与
  • 从函数在 python 3 中创建全局变量

    我想知道为什么在函数结束后我无法访问变量 variable for raw data 代码是这样的 def htmlfrom Website URL import urllib request response urllib request
  • 如何将另一整列作为参数传递给 pandas fillna()

    我想用另一列中的值填充一列中的缺失值 使用fillna方法 我读到循环遍历每一行将是非常糟糕的做法 最好一次完成所有事情 但我不知道如何使用fillna 之前的数据 Day Cat1 Cat2 1 cat mouse 2 dog eleph
  • 从 HDF5 文件中删除信息

    我意识到 SO 用户以前曾问过这个问题question https stackoverflow com questions 1124994 removing data from a hdf5 file rq 1但它是在 2009 年被问到的
  • nltk 标记化和缩写

    我用 nltk 对文本进行标记 只是将句子输入到 wordpunct tokenizer 中 这会拆分缩写 例如 don t 到 don t 但我想将它们保留为一个单词 我正在改进我的方法 以实现更精确的文本标记化 因此我需要更深入地研究

随机推荐

  • chrome扩展弹出窗口无法通过ID找到元素

    我知道类似的问题已经被问过很多次了 但我还没有找到适合我的解决方案 我的问题很简单 我想做的就是测试 popup html 上的操作 因为在这里 我在弹出窗口上有一个单击按钮 当我单击它时 我想显示警报 但什么也没发生 它没有找到该元素 我
  • Flutter添加ScrollView和背景图片

    您好 我正在尝试将 ScollView 添加到我的应用程序中 但问题是我不能同时拥有 ScrollView 和背景图像 所以如果有人可以帮助我 这里我放置了背景图像 那么我现在如何放置滚动视图 我有两个工作 但不是同时工作 我使用的滚动视图
  • 启动多个线程并重新启动它们

    我正在尝试编写一个系统 在其中创建 x 个工作线程 这些线程将在不同的时间完成它们的工作 当它们中的任何一个完成工作时 我将检查它们的输出并再次重新启动它们 将运行的线程数保持在 x 左右 我将进行多次任意迭代 因此 基本上控制器线程将启动
  • 如何在spark结构化流连接中选择最新记录

    我使用的是spark sql 2 4 x版本 datastax spark cassandra connector用于Cassandra 3 x版本 和卡夫卡一起 我有货币样本的汇率元数据如下 val ratesMetaDataDf Seq
  • php 中的字符串到压缩流

    我有一个带有数据库的处理服务器和一个服务数据库 以较低的带宽成本提供文件 在处理服务器上 php 无法创建文件 因此一切都必须通过流完成和 或保留在内存中 然后才能发送到另一台服务器进行下载 几天前 我发现了 php memory 的流抽象
  • 如何在 ggplotly() 中使分组热图的列宽全部相同

    我有这个数据框 gene symbol lt c DADA SDAASD SADDSD SDADD ASDAD XCVXCVX EQWESDA DASDADS SDASDASD DADADASD sdaadfd DFSD SADADDAD
  • 为什么 SQLSTATE[HY000]: 一般错误?

    这是一个用于注册组长及其伙伴的代码
  • 用于年龄验证的正则表达式,仅使用 Javascript 接受 0-200 之间的年龄

    我想要一个允许用户仅输入 0 到 200 之间的数字的正则表达式 我已经尝试过这个但它不起作用 var age regex S 0 9 0 3 您可以比较数值本身 而不是正则表达式 var ageValue 50 get the input
  • Matplotlib:外部图例,分布在多个子图中

    我有一个包含 2 个子图的图 所有子图共享相应的图表 即相同颜色的相同标签 我想在图的顶部有一个图例 延伸到两个子图 类似于下面的代码 import numpy as np import matplotlib pyplot as plt x
  • 请求标头中的 JWT 与接收 .Net Core API 时不一样

    当我从 Angular 应用程序向 Net Core 2 API 发出请求时 JWT 与请求标头中发送的 JWT 不同 启动 cs public class Startup public Startup IHostingEnvironmen
  • 从字符串中分割特殊字符和字母

    我有一个字符串值 我包含字母 特殊字符 数字和空格的组合 但我只想检索数字 my code Dim str1 As String 123456habAB Dim str2 As String Regex Replace str1 gt
  • 双截断输出的 7 个字符

    double fat 0 2654654645486684646846865584656566554566556564654654899866223625564668186456564564664564 cout lt
  • 无法删除被授予连接数据库的角色

    我正在使用 PostgreSQL 10 4 我发现了一个奇怪的行为 如果我们创建一个角色并将其授予CONNECT数据库 CREATE ROLE dummy GRANT CONNECT ON DATABASE test TO dummy 那么
  • 引用方法内的对象时出现问题

    internal class Program public class Creature public int health public int damage public int coins public static void Hit
  • 如何将现有的 iPhone 应用程序移植到 iPad

    我有一个 iPhone 应用程序 现在我想将该应用程序转换为可在所有 iPhone iPod iPad 设备上运行的通用应用程序 那么 从哪里开始 我需要做哪些事情呢 任何帮助 链接 示例应用程序 任何东西 都将受到高度赞赏 提前致谢 我最
  • iPhone OS 应用程序的可用内存

    是否有函数或常量定义 iPhone OS 中应用程序的可用内存量 我正在寻找一种独立于设备 iPod touch iPhone iPad 的方式来了解应用程序还剩多少内存 该函数将返回可用内存 以字节为单位 import
  • 如何定义二维数组?

    我想定义一个没有初始化长度的二维数组 如下所示 Matrix 但这给出了一个错误 IndexError 列表索引超出范围 从技术上讲 您正在尝试为未初始化的数组建立索引 在添加项目之前 您必须首先使用列表初始化外部列表 Python 称之为
  • 符号函数矩阵

    我想在 Matlab 中定义一个符号函数 而不是变量 矩阵 在工作区中 我希望它成为大小为 N M 的类 symfun 的元素 其中N and M是正整数 你不能创建一个矩阵symfun类元素 可能出于同样的原因无法创建函数句柄矩阵 但您可
  • 如何在 PHP 中发送 HTTP 请求并检索响应(通过标头微调)?

    我必须向 URL 发送 HTTP 请求并检索响应和标头 我不仅对页面内容感兴趣 而且对所有标题也感兴趣 最佳解决方案是什么 插座 PEAR 库不可访问 PHP 配置不可编辑 你应该使用curl 文档中的快速示例
  • 如何在与主程序不同的线程中编写套接字服务器(使用 gevent)?

    我正在开发一个 Flask gevent WSGIserver Web 服务器 它需要使用 XML 通过两个套接字与硬件设备进行通信 在后台 一个套接字由客户端 我的应用程序 启动 我可以向设备发送 XML 命令 设备在不同的端口上应答并发