如何使用Akka-HTTP客户端websocket发送消息

2023-11-25

我正在按照以下文档尝试客户端 WebsocketwebSocket客户端流.

示例代码是:

import akka.actor.ActorSystem
import akka.Done
import akka.http.scaladsl.Http
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.ws._

import scala.concurrent.Future

object WebSocketClientFlow {
  def main(args: Array[String]) = {
    implicit val system = ActorSystem()
    implicit val materializer = ActorMaterializer()
    import system.dispatcher

    // Future[Done] is the materialized value of Sink.foreach,
    // emitted when the stream completes
    val incoming: Sink[Message, Future[Done]] =
      Sink.foreach[Message] {
        case message: TextMessage.Strict =>
          println(message.text)
      }

    // send this as a message over the WebSocket
    val outgoing = Source.single(TextMessage("hello world!"))

    // flow to use (note: not re-usable!)
    val webSocketFlow = Http().webSocketClientFlow(WebSocketRequest("ws://echo.websocket.org"))

    // the materialized value is a tuple with
    // upgradeResponse is a Future[WebSocketUpgradeResponse] that
    // completes or fails when the connection succeeds or fails
    // and closed is a Future[Done] with the stream completion from the incoming sink
    val (upgradeResponse, closed) =
      outgoing
        .viaMat(webSocketFlow)(Keep.right) // keep the materialized Future[WebSocketUpgradeResponse]
        .toMat(incoming)(Keep.both) // also keep the Future[Done]
        .run()

    // just like a regular http request we can access response status which is available via upgrade.response.status
    // status code 101 (Switching Protocols) indicates that server support WebSockets
    val connected = upgradeResponse.flatMap { upgrade =>
      if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
        Future.successful(Done)
      } else {
        throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
      }
    }

    // in a real application you would not side effect here
    connected.onComplete(println)
    closed.foreach(_ => println("closed"))
  }
}

连接升级后,如何使用connection向 websocket 服务器端发送消息?

我从文档中注意到:

此方法返回的 Flow 只能实现一次。对于每个请求,必须通过再次调用该方法来获取新的流。

仍然很困惑,既然升级后的连接已经准备好了,为什么我们需要多次构建流程。


您可以创建基于参与者的源并通过已建立的 Websocket 连接发送新消息。

    val req = WebSocketRequest(uri = "ws://127.0.0.1/ws")
    val webSocketFlow = Http().webSocketClientFlow(req)

    val messageSource: Source[Message, ActorRef] = 
         Source.actorRef[TextMessage.Strict](bufferSize = 10, OverflowStrategy.fail)

    val messageSink: Sink[Message, NotUsed] =
        Flow[Message]
            .map(message => println(s"Received text message: [$message]"))
            .to(Sink.ignore)

    val ((ws, upgradeResponse), closed) =
        messageSource
            .viaMat(webSocketFlow)(Keep.both)
            .toMat(messageSink)(Keep.both)
            .run()

    val connected = upgradeResponse.flatMap { upgrade =>
        if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
            Future.successful(Done)
        } else {
            throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
        }
    }

    ws ! TextMessage.Strict("Hello World")
    ws ! TextMessage.Strict("Hi")
    ws ! TextMessage.Strict("Yay!")

`

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何使用Akka-HTTP客户端websocket发送消息 的相关文章

  • 如何强制 Spark 执行代码?

    我如何强制 Spark 执行对 map 的调用 即使它认为由于其惰性求值而不需要执行它 我试过把cache 与地图调用 但这仍然没有解决问题 我的地图方法实际上将结果上传到 HDFS 所以 它并非无用 但 Spark 认为它是无用的 简短回
  • 如何处理 Spark 数据框中外连接的数据倾斜

    我有两个数据框 正在对 5 列执行外连接 下面是我的数据集的示例 uniqueFundamentalSet PeriodId SourceId StatementTypeCode StatementCurrencyId FinancialS
  • 发送 FakeRequest 时如何为 akka.stream.Materializer 提供隐式值?

    我正在尝试理解下面看到的错误 并学习如何修复它 could not find implicit value for parameter materializer akka Stream Materializer val fut Future
  • 最大模式长度 fpgrowth apache Spark

    我正在尝试使用 Spark Scala 运行关联规则 我首先创建一个 FPGrowth 树并将其传递给关联规则方法 但是 我希望添加最大模式长度参数 以限制我想要在左侧和右侧的项目数量 我只想要项目之间的一对一关联 val model ne
  • 如何在“THEN”中打印“IF”条件的源代码

    我想在 THEN 部分打印 IF 条件的 Scala 源代码 例子 IF 2 2 lt 5 THEN println I am in THEN because sourceCodeOfCondition 现在我们跳过THEN部分 问题是 如
  • 如何对 RDD 进行分区

    我有一个文本文件 其中包含大量由空格分隔的随机浮动值 我正在将此文件加载到 scala 中的 RDD 中 这个RDD是如何分区的 另外 是否有任何方法可以生成自定义分区 以便所有分区都具有相同数量的元素以及每个分区的索引 val dRDD
  • 隐式类中的 Scala 按名称调用构造函数参数

    下面的代码不编译 期望的是在隐式类中有一个按名称调用构造函数参数 如下所示 def f n Int 1 to n product implicit class RichElapsed A val f gt A extends AnyVal
  • 如何在 Scala mutable.Seq 上追加或前置

    Scala 有一些我不明白的地方collection mutable Seq http www scala lang org api current index html scala collection mutable Seq 它描述了所
  • 将类型信息传递给 Scala 中的函数

    我有对 json 对象执行一些常见操作的代码 即提取 所以我想创建一个通用函数 它接受哪个类的类型参数 代码如下所示 def getMessageType T json JValue Either GenericError T try Ri
  • Scala Continuations - 为什么我的转移调用不能位于 try-catch 块内?

    我对 Scala 延续很陌生 而且对一般的 scala 语言也比较陌生 我尝试使用 Scala 延续并编写了以下代码 case class MyException msg String extends Exception def go In
  • 乔达时间:将 UTC 转换为本地时间

    我想将 Joda Time UTC DateTime 对象转换为本地时间 这是一种看似有效的费力方法 但一定有更好的方法 这是没有周围声明的代码 在 Scala 中 val dtUTC new DateTime 2010 10 28T04
  • 为什么Scala语言中的++:运算符这么奇怪?

    我正在使用 运算符来获取两个集合的集合 但是我使用这两种方法得到的结果不一致 scala gt var r Array 1 2 r Array Int Array 1 2 scala gt r Array 3 scala gt r res2
  • 通过 Gradle 进行测试时记录日志

    在测试时 Gradle 似乎将 stdout stderr 重定向到project dir build reports tests index html 有没有办法避免这种重定向 并将内容打印到控制台 附加信息 这是一个 Scala 2 9
  • 将 Scala 文件转换为 Dll

    我有一些使用 IntelliJ 和 SBT Plugin 编写的 scala 代码 并希望将代码作为 C 的 DLL 提供给我 我已经尝试使用 ikvmc 我通过 package 将所有类打包在一个罐子中 之后 我手动设置一个 jar 其中
  • 简单的 Scala actor 问题

    我确信这是一个非常简单的问题 但很不好意思地说我无法理解它 我有一个 Scala 值列表 我想使用演员来并行地对每个值进行一些 外部 调用 我想等到所有值都已处理完毕 然后继续 没有共享值被修改 有人可以建议吗 Thanks Scala 中
  • 《使用 Apache Flink 进行流处理》如何从 IntelliJ 运行书籍代码?

    如中所述这个帖子 https stackoverflow com questions 61043860 how to run first example of apache flink我无法成功运行 使用 Apache Flink 进行流处
  • Spark:用列的平均值替换数据框中的空值

    如何创建 UDF 以编程方式将每列中 Spark 数据框中的空值替换为列平均值 例如 在示例中 数据 col1 空值的值为 2 4 6 8 5 5 5 示例数据 col1 col2 col3 2 null 3 4 3 3 6 5 null
  • Liftweb 环境中的后台任务

    我必须编写守护进程 并且我想使用模型来连接到数据库和一些有用的 Lift 类 是否可以运行 Rails 的 rake 任务的模拟 Scala 社区组上也有类似的问题 答案是使用Actors来做后台处理
  • Scala 性能问题

    In the 丹尼尔 科泽夸 Daniel Korzekwa 撰写的文章 http blog danmachine com 2011 01 moving from java to scala one year html 他说以下代码的性能
  • 哪些 ORM 与 Scala 配合得很好? [关闭]

    就目前情况而言 这个问题不太适合我们的问答形式 我们希望答案得到事实 参考资料或专业知识的支持 但这个问题可能会引发辩论 争论 民意调查或扩展讨论 如果您觉得这个问题可以改进并可能重新开放 访问帮助中心 help reopen questi

随机推荐

  • 如何通过 JavaScript 禁用 Chrome 的已保存密码提示设置

    有没有办法借助 JavaScript 或 jQuery 来操作 Chrome 设置 我想使用 JavaScript 禁用保存密码弹出气泡 这个怎么做 现在我将回答我自己的问题 它可以在 chrome 和 mozilla firefox 中完
  • 将 ForEachAsync 与 Action 内的 wait 一起使用时不等待

    以下应该返回 C 但它返回 B using System Data Entity var state A var qry from f in db myTable select f await qry ForEachAsync async
  • 如何告诉 Visual Studio 在出现特定异常时不要中断?

    我有一个特定类型的异常 我希望 Visual Studio 能够处理该异常不继续并显示异常助手屏幕 本质上 我希望它只是让我的正常异常处理基础设施来处理它 该异常是 System Exception 的继承者 我编写了它并拥有其源代码 任何
  • Phonegap - 如何使状态栏变黑?

    非常简单的一个问题 我似乎找不到答案 我如何将 iPhone 状态栏 顶部的细栏 带有接收 电池等 从默认灰色更改为黑色PhoneGapiPhone 应用程序 谢谢 格伦 PhoneGap iPhone 应用程序只是一个常规的 Xcode
  • WPF DataGrid SelectedItem 绑定在项目更改后停止工作

    我的问题 情况非常类似于Wpf DataGrid SelectedItem 在单元格编辑后失去绑定但我没有使用任何 自定义 WPF 框架 我有一个实现的模型INotifyPropertyChanged and IEditableObject
  • 重用异步套接字:后续连接尝试失败

    我试图在异步 HTTP 客户端中重用套接字 但我无法第二次连接到主机 我基本上将异步 HTTP 客户端视为具有以下状态的状态机 可用 插座可供使用 正在连接 套接字正在连接到端点 发送 套接字正在向端点发送数据 正在接收 套接字正在从端点接
  • 为什么书上说“编译器在内存中为变量分配空间”?

    为什么书上说 编译器在内存中为变量分配空间 这不是可执行文件吗 我的意思是 例如 如果我编写以下程序 include
  • 无法循环打开 png 设备

    我一直在摆弄 R 中的一个函数 长话短说 我有一个for loop 在每一步 我使用保存一个图png 然后立即readPNG这样我就可以提取RGB信息 然后我制作第二个情节 然后readPNG这样我就可以比较两个图像的 RGB 问题是我不断
  • Snowflake (LEFT JOIN) LATERAL:无法评估不支持的子查询类型

    横向连接 在 FROM 子句中 LATERAL 关键字允许内联视图引用该内联视图之前的表表达式中的列 横向连接的行为更像是相关子查询 而不是大多数连接 让我们稍微调整一下文档中提供的代码 CREATE TABLE departments d
  • 导入错误:未找到 MagickWand 共享库 [windows]

    早上好 经过多次尝试运行 from wand image import Image 我收到以下错误 Traceback most recent call last File C Users XXXXX PycharmProjects PDF
  • Botframework:如何使用机器人处理长时间运行的任务?

    如何处理机器人上长时间运行的任务 以便客户端不会在 15 秒后再次尝试发送消息 我有一个带有 botframework v3 的机器人 并通过直线连接客户端 The 直达专线通道连接器本身不会重试发送消息 如果它在向您的机器人发送消息后 1
  • 获取 .NET 对象的内存地址 (C#)

    我试图追踪单声道运行时中的一个错误 其中一个变量似乎分配给一个有效对象 然后稍后重新分配给一个虚假对象 特别是 early in code I allocate fine var o new object valid allocation
  • 两个 ddev 项目之间的通信

    我有两个需要相互交互的 ddev 项目 当遇到一些问题时 我会检查连接的已解析 IP 我通过 ssh 进入 project1 并 ping project2 来完成此操作 ping project2 ddev local 域名解析为 127
  • Spring security oauth 2简单示例

    我尝试根据官方教程实现我自己的示例Sparklr2 Tonr2 一切看起来都不错 但是当我从web xml in my Tonr2实现 弹簧安全过滤器我有例外 尚未为当前请求建立重定向 URI 我不明白我应该使用什么 URL 这是我的代码
  • 凹边界半径可以吗?

    这是一个简单的凸示例 http jsfiddle net swY5k test width 200px height 200px background 888888 border radius 50px 但是 我想要一个凹形边界半径 我尝试
  • jQuery .each css 不是一个函数

    我有一个包含 3 个成员的 jQuery 对象 var elements this wrapperName gt ul gt li gt a gt img Object 0 img 1 img 2 img length 3 prevObje
  • 如何将正则表达式转换为字符串文字并再次转换回来?

    我怎么能够 将带有标志的 JavaScript RegExp 转换为字符串文字 想想 JSON 并将该文字转换回正则表达式 例如使用字符串 the weather is nice today var myRe new RegExp weat
  • Android 位置管理器标准

    我需要从网络和 GPS 提供商处接收位置更改 如果 GPS 提供商不可用或没有位置 基于卫星可见性 我将从网络提供商处接收位置 否则从 GPS 提供商处接收位置 是否可以根据我的需要使用标准选择提供商 实际上Android 开发者 让您的应
  • 通过 URL 运行自动化脚本

    马克西莫 7 6 1 1 我想通过调用单独系统中的 URL 来运行 Maximo 自动化脚本 是否有可能做到这一点 这是一个很好的用例 也是我们过去几天一直在努力解决的问题 创建自动化脚本 我的叫做automation api test 使
  • 如何使用Akka-HTTP客户端websocket发送消息

    我正在按照以下文档尝试客户端 WebsocketwebSocket客户端流 示例代码是 import akka actor ActorSystem import akka Done import akka http scaladsl Htt