如何使用 vert.x-rx 创建反应式客户端-服务器 TCP 通信

2023-12-26

我目前正在开发一个项目,该项目需要外部系统和我将编写的应用程序(用Java)之间的TCP 通信。众所周知,使用常规 NIO 可以轻松实现这一点。然而,作为我正在开发的这个新项目的一部分,我必须使用 Vert.x 来提供 TCP 通信。请参考下图:

在右侧,我的应用程序作为 TCP 服务器运行,等待来自左侧外部系统的连接。我读过要创建 TCP 并侦听连接,您只需执行以下操作:

NetServer server = vertx.createNetServer();
server.listen(1234, "localhost", res -> {
  if (res.succeeded()) {
    System.out.println("Server is now listening!");
  } else {
    System.out.println("Failed to bind!");
  }
});

然而,我不明白的是当外部系统连接到我的应用程序并通过 TCP 发送 EchoRequestMessages 时如何处理。我的应用程序必须获取接收到的字节缓冲区,将其解码为 EchoRequestMessage POJO,然后将 EchoResponseMessage 编码为字节缓冲区以发送回外部系统。

我如何使用 vert.x-rx 对 EchoRequestMessage 的接收、其解码、EchoResponseMessage 的编码执行反应式编程,然后将其发送回外部系统,所有这些都在一个构建器模式类型设置中。我读过有关 Observables 和订阅的内容,但我不知道要观察什么或订阅什么。任何帮助将不胜感激。


要从套接字读取数据,您可以使用RecordParser。在套接字连接上,数据通常由换行符分隔:

RecordParser parser = RecordParser.newDelimited("\n", sock);

A RecordParser是一个 Vert.xReadStream所以它可以转化为Flowable:

FlowableHelper.toFlowable(parser)

现在如果一个EchoRequestMessage可以从创建Buffer:

public class EchoRequestMessage {
  private String message;

  public static EchoRequestMessage fromBuffer(Buffer buffer) {
    // Deserialize
  }

  public String getMessage() {
    return message;
   }
 }

And an EchoResponseMessage转换为Buffer:

public class EchoResponseMessage {
  private final String message;

  public EchoResponseMessage(String message) {
    this.message = message;
  }

  public Buffer toBuffer() {
    // Serialize;
  }
}

您可以使用 RxJava 运算符来实现回显服务器流程:

vertx.createNetServer().connectHandler(sock -> {

  RecordParser parser = RecordParser.newDelimited("\n", sock);

  FlowableHelper.toFlowable(parser)
    .map(EchoRequestMessage::fromBuffer)
    .map(echoRequestMessage -> {
      return new EchoResponseMessage(echoRequestMessage.getMessage());
    })
    .subscribe(echoResponseMessage -> sock.write(echoResponseMessage.toBuffer()).write("\n"), throwable -> {
      throwable.printStackTrace();
      sock.close();
    }, sock::close);

}).listen(1234);

[编辑] 如果在您的协议消息中不是行分隔而是长度前缀,那么您可以创建自定义ReadStream:

class LengthPrefixedStream implements ReadStream<Buffer> {
  final RecordParser recordParser;
  boolean prefix = false;

  private LengthPrefixedStream(ReadStream<Buffer> stream) {
    recordParser = RecordParser.newFixed(4, stream);
  }

  @Override
  public ReadStream<Buffer> exceptionHandler(Handler<Throwable> handler) {
    recordParser.exceptionHandler(handler);
    return this;
  }

  @Override
  public ReadStream<Buffer> handler(Handler<Buffer> handler) {
    if (handler == null) {
      recordParser.handler(null);
      return this;
    }
    recordParser.handler(buffer -> {
      if (prefix) {
        prefix = false;
        recordParser.fixedSizeMode(buffer.getInt(0));
      } else {
        prefix = true;
        recordParser.fixedSizeMode(4);
        handler.handle(buffer);
      }
    });
    return this;
  }

  @Override
  public ReadStream<Buffer> pause() {
    recordParser.pause();
    return this;
  }

  @Override
  public ReadStream<Buffer> resume() {
    recordParser.resume();
    return this;
  }

  @Override
  public ReadStream<Buffer> endHandler(Handler<Void> endHandler) {
    recordParser.endHandler(endHandler);
    return this;
  }
}

并将其转换为Flowable:

FlowableHelper.toFlowable(new LengthPrefixedStream(sock))
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何使用 vert.x-rx 创建反应式客户端-服务器 TCP 通信 的相关文章

随机推荐

  • Visual Studio 不会编译带有 *.inl 实现的模板类

    我正在关注一本关于 SFML 游戏开发的书 但我被困在第二章 因为我无法编译我刚刚编写的代码 它几乎是从书中逐字复制 除了成员变量名称和异常文本 我有使用 C 和模板的经验 但我以前从未见过这个错误 而且我已经盯着它看了几个小时了 我没有发
  • cv2.waitKey(1) & 0xff == ord('q') 如何工作?

    这条线如何运作 据我所知 到目前为止 输出cv2 waitKey number 对于所有的每一个int数字是 1 and 0xff是一个十六进制数 等于255以十进制数字表示 1 0xff等于255以十进制数字表示 Also ord q 等
  • 在后台线程上构建 UIView

    我知道 UI 应该只在主线程上更新 但是是否可以在单独的线程上创建和添加子视图 只要它们不添加到可见视图中 会导致内存和性能问题吗 这是一些示例代码 NSOperationQueue queue NSOperationQueue alloc
  • 使用 Spring MVC RequestMappingHandlerMapping 和 Spring Websocket 的 ServletWebSocketHandlerRegistry 处理相同的 URL

    我想要拥有什么 客户端发送GET HTTP 1 1 没有Connection upgrade 该请求应由RequestMappingHandlerMapping 客户端发送Connection upgrade与 GET 请求一起 该请求应该
  • Oracle SQL:为什么我的函数输出 null?

    CREATE OR REPLACE FUNCTION get status by member id p member id NUMBER RETURN CHAR AS v status CHAR 1 BEGIN select status
  • C++ 指针和指向引用的指针

    我正在尝试创建一个二叉搜索树 我使用递归过程将节点插入树中 代码如下 void BST insertRoot Node node int data if node NULL this gt root new Node data else i
  • 使用依赖规则匹配的方面意见提取中的命名实体识别

    使用 Spacy 我根据我定义的语法规则从文本中提取方面意见对 规则基于 POS 标签和依赖标签 通过以下方式获得token pos and token dep 下面是其中一项语法规则的示例 如果我通过了判决Japan is cool 它返
  • 部署到 SharePoint 2010 网站的 WCF 服务出现“EndPoint Not Found”错误

    我正在尝试利用自动完成扩展器 questions tagged autocompleteextender来自ajax控制工具包 questions tagged ajaxcontroltoolkit在我的一个共享点 questions ta
  • 如何从页脚中删除“订单和退货”?

    我已将新的 Magento 1 5 0 1 安装更新为 Magento 1 6 0 0 现在页脚中有一个链接 订单和退货 我 尚 不知道如何删除它 我无法从核心文件中删除它 我已经尝试过 XML 方法 但似乎不起作用 可能是我的错 目前 我
  • 如果未选择下拉列表,则不会在提交时发送 GET 变量

    我有一个表单 用于过滤仅包含下拉列表的搜索结果 我使用 GET 而不是 post 以便可以轻松地通过 URL 共享结果
  • 将字符串转换为Java代码

    我遇到了一个奇怪的案例 在给定的数据库中 我得到了一条记录VARCHAR字段 所以在我的实体模型中我添加了这个字段 以及 getter 和 setter 现在是乐趣开始的时刻 下面的字符串实际上是方法的主体 它看起来像这样 if score
  • 尝试修改不可创建的数组值

    我没有 Perl 经验 并且很难解决此错误 任何帮助表示赞赏 脚本如下 第40行报告问题 粗体 usr bin perl print Please enter filename without extension input lt gt c
  • 在设备上构建失败,退出代码为 1

    我成功构建了一个应用程序 进行一些更改后 在模拟器上构建和运行仍然按预期正常工作 但在我的物理设备上构建和运行却意外失败 PhaseScriptExecution CP Embed Pods Frameworks Users olivera
  • 如何获取 numpy / scipy 中特定百分位的索引?

    我看过这个答案 https stackoverflow com a 2374662 391161它解释了如何计算特定百分位数的值 以及这个答案 https stackoverflow com a 12414469 391161它解释了如何计
  • 在没有 GLUT 的情况下初始化 OpenGL

    我能找到的每个介绍和示例似乎都使用 GLUT 或其他一些框架来 初始化 OpenGL 有没有一种方法可以仅使用 GL 和 GLU 中可用的内容来初始化 OpenGL 如果不是 那么 GLUT 正在做什么 如果没有它就不可能实现 正如卢克所指
  • 标准化矩阵 python 的行

    给定 python 中的二维数组 我想用以下规范标准化每一行 Norm 1 L 1 Norm 2 L 2 标准信息 L Inf 我已经开始这段代码 from numpy import linalg as LA X np array 1 2
  • Elastic Search版本冲突问题

    我正在使用弹性搜索来进行搜索 但最近我观察到在将数据添加到弹性搜索时出现一些随机错误 版本冲突 需要 seqNo 113789 主要术语 19 当前文档有 seqNo 113797 和主要术语 19 上述类型错误是随机出现的 我无法在弹性搜
  • ADO 和 DAO 的区别

    这不是一个更好的问题 而是一个关于为什么它们在功能上不同的问题 我遇到的问题已得到解决 但我很好奇为什么会发生这种行为 背景 使用 Excel vba 从 Access 数据库中提取数据 当用户单击按钮时 将从 Access 中提取记录集
  • 每个函数名称后面的@n(“at 符号”)是什么?

    我正在尝试使用 Netwide Assembler 学习汇编语言 在教程中 我看到有一个 n在每个函数名称的末尾 例如 CALL GetStdHandle 4 CALL WriteFile 20 CALL ExitProcess 4 这是做
  • 如何使用 vert.x-rx 创建反应式客户端-服务器 TCP 通信

    我目前正在开发一个项目 该项目需要外部系统和我将编写的应用程序 用Java 之间的TCP 通信 众所周知 使用常规 NIO 可以轻松实现这一点 然而 作为我正在开发的这个新项目的一部分 我必须使用 Vert x 来提供 TCP 通信 请参考