Akka HTTP 连接池在几个小时后挂起

2024-04-24

我有一个 HTTP 连接池,在运行几个小时后挂起:

private def createHttpPool(host: String): SourceQueue[(HttpRequest, Promise[HttpResponse])] = {
    val pool = Http().cachedHostConnectionPoolHttps[Promise[HttpResponse]](host)
    Source.queue[(HttpRequest, Promise[HttpResponse])](config.poolBuffer, OverflowStrategy.dropNew)
      .via(pool).toMat(Sink.foreach {
        case ((Success(res), p)) => p.success(res)
        case ((Failure(e), p)) => p.failure(e)
      })(Keep.left).run
  }

我将项目排队:

private def enqueue(uri: Uri): Future[HttpResponse] = {
    val promise = Promise[HttpResponse]
    val request = HttpRequest(uri = uri) -> promise

    queue.offer(request).flatMap {
      case Enqueued => promise.future
      case _ => Future.failed(ConnectionPoolDroppedRequest)
    }
}

并像这样解决响应:

private def request(uri: Uri): Future[HttpResponse] = {
    def retry = {
      Thread.sleep(config.dispatcherRetryInterval)
      logger.info(s"retrying")
      request(uri)
    }

    logger.info("req-start")
    for {
      response <- enqueue(uri)

      _ = logger.info("req-end")

      finalResponse <- response.status match {
        case TooManyRequests => retry
        case OK => Future.successful(response)
        case _ => response.entity.toStrict(10.seconds).map(s => throw Error(s.toString, uri.toString))
      }
    } yield finalResponse
}

如果 Future 成功,则该函数的结果总是会被转换:

def get(uri: Uri): Future[Try[JValue]] = {
  for {
    response <- request(uri)
    json <- Unmarshal(response.entity).to[Try[JValue]]
  } yield json
}

一切工作正常一段时间,然后我在日志中看到的只是 req-start 而没有 req-end。

我的akka​​配置是这样的:

akka {
  actor.deployment.default {
    dispatcher = "my-dispatcher"
  }
}

my-dispatcher {
  type = Dispatcher
  executor = "fork-join-executor"

  fork-join-executor {
    parallelism-min = 256
    parallelism-factor = 128.0
    parallelism-max = 1024
  }
}

akka.http {
  host-connection-pool {
    max-connections = 512
    max-retries = 5
    max-open-requests = 16384
    pipelining-limit = 1
  }
}

我不确定这是配置问题还是代码问题。我的并行度和连接数如此之高,因为没有它,我的请求/秒速率非常差(我想尽可能快地请求 - 我有其他速率限制代码来保护服务器)。


您没有消耗从服务器返回的响应实体。引用以下文档:

消耗(或丢弃)请求的实体是强制性的!如果 意外地留下既没有消耗也没有丢弃 Akka HTTP 将假设 传入的数据应保持背压,并将停止 通过 TCP 反压机制传入数据。客户应该 无论 HttpResponse 的状态如何,都会使用实体。

该实体以以下形式出现:Source[ByteString, _]需要运行它以避免资源匮乏。

如果您不需要读取实体,则使用实体字节的最简单方法是丢弃它们,方法是使用

res.discardEntityBytes()

(您可以通过添加 - 例如 - 来附加回调.future().map(...)).

文档中的此页面 http://doc.akka.io/docs/akka-http/10.0.0/scala/http/implications-of-streaming-http-entity.html描述了所有替代方案,包括如何在需要时读取字节。

--- EDIT

提供更多代码/信息后,很明显资源消耗不是问题。此实现中还有另一个大危险信号,即Thread.sleep在重试方法中。 这是一个阻塞调用,很可能会导致底层 Actor 系统的线程基础设施处于饥饿状态。

详细解释了为什么这是危险的docs http://doc.akka.io/docs/akka-http/10.0.3/scala/http/handling-blocking-operations-in-akka-http-routes.html.

尝试改变它并使用akka.pattern.after (docs http://doc.akka.io/docs/akka/current/scala/futures.html#After)。下面的例子:

def retry = akka.pattern.after(200 millis, using = system.scheduler)(request(uri))
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Akka HTTP 连接池在几个小时后挂起 的相关文章

随机推荐

  • 什么是 AABB - 碰撞检测?

    嗨 我正在制作一个体素游戏Java在研究我需要学习的所有不同东西时 我注意到很多游戏都使用AABB用于碰撞检测 然后我记得看到AABB在 我的世界 中也有 但是当我用谷歌搜索什么时AABB也就是说 它只是提出了其他人的代码 或者历史书上的某
  • Exchange Web 服务托管 API:访问其他用户项目

    是否可以访问除登录用户之外的其他 Exchange 帐户的文件夹和项目 我可以通过 Exchange Web 服务托管 API 执行此操作吗 是的 这是可能的 但您应该知道其他用户的密码或以某种方式获取此凭据 NetworkCredenti
  • 如何使用 @ngrx/data 自定义我的减速器?

    我正在学习使用 ngrx data 确实 使用该库我推进了很多代码 但在个性化它时我遇到了问题 我已经了解了如何将字段添加到集合中 export const entityMetadata EntityMetadataMap User add
  • iframe 不适用于 iOS(离子框架)

    我为 iOS 和 Android 开发了一个带有 ionic 框架的应用程序 安卓上没有问题 但是 尽管 iframe 在 Android 和浏览器中运行良好 但它在 iOS 设备和模拟器中没有显示任何内容 请帮我 尝试将其添加到您的 co
  • C 中浮点数的比较

    我有一个double打印为0 000000我试图将其与0 0f 不成功 为什么这里有区别呢 确定双精度数是否为零的最可靠方法是什么 确定它是否足够接近零以将其打印为0 000000精确到小数点后六位 例如 fabs d lt 0 00000
  • Google Maps API 多边形文物在缩放上显示

    我有一个使用 JS API 构建的 Google 地图 其中包含英国不同地区的多边形 在 Chrome 已测试 v58 和 v60 上 当用户使用滚轮放大或缩小时 多边形的随机伪像会卡在地图上 进一步缩放可以消除它们或显示不同的伪影 在 F
  • 在虚拟机共享目录上运行“grunt”

    将 Vagrant 与 Windows 主机和 Linux 客户机一起使用 grunt尝试运行作业时返回以下错误 据我了解 在主机 来宾共享目录中 此文件路径超出了 Windows 的 255 个字符限制 npm ERR Error EPE
  • 如何将 UIPopoverView 显示为地图视图的注释? (iPad)

    在 iPad 的地图应用程序上 当您点击图钉时 您会看到带有 i 的普通注释 而不是公开指示符 进一步点击 i 会显示一个像这样的弹出视图控制器 有没有办法可以轻松实现这一目标 首先在地图上添加注释viewForAnnotation方法 设
  • 如何在 PyCharm 中自动运行 Main.py

    我更喜欢在我的项目中使用类似 Java 的组织 例如 每个班级都有不同的 py文件和类Main 其中仅包含 main 函数 每次我对一个类进行更改时 我都需要单击 Main 并运行代码 Ctrl Shift F10 从那里 我该如何定义我的
  • Rails 计数器缓存与计算

    我有一个想要显示总计的集合 其想法是为我需要的每个总计使用缓存 然而 我还需要深入研究数据集 所以很可能无论如何我都必须加载该集合 那么我应该仍然使用缓存还是只使用计算 正如斯蒂芬 奥唐纳 Stephen ODonnell 所说 取决于藏品
  • 如何使用 Kotlin 在 Android 中压缩照片

    现在这是我从智能手机拍照的代码 问题是图像非常大 我想压缩它 有一些帮助或想法吗 谢谢 你的 信息 执行 abrirCamara 方法 private fun abrirCamara val values ContentValues val
  • Facebook 帖子,图片托管在公共 CDN 上

    Facebook 似乎屏蔽了我引用的任何由 CDN 支持的图像 这是真的 有办法解决这个问题吗 是否有文档说明您可以在帖子的 图片 参数中引用哪些内容以及不能引用哪些内容 是的 Facebook 禁止您在帖子 分享等中使用其 CDN 中的图
  • 如何将Stripe支付集成到Yii2中?

    我有以下代码 它运行时没有错误 但它不会将资金插入到 Stripe 服务器上 Stripe 库已正确安装 配置文件
  • BERT - 池化输出与序列输出的第一个向量不同

    我在 Tensorflow 中使用 BERT 有一个细节我不太明白 根据文档 https tfhub dev google bert uncased L 12 H 768 A 12 1 https tfhub dev google bert
  • 如何从 Objective-C 中的方法返回 C 数组?

    我有一个返回变量的函数 我想知道如何返回一个数组 问题是它不是 NSArray 它只是一个像这样的平均 C 数组 b2Fixture addFixturesToBody b2Body body forShapeName NSString s
  • 在CXF中使用javax.ws.rs.client.ClientBuilder创建客户端,任何路由都能够使用本地传输?

    我正在开发一个使用标准的代码库 javax ws rs client ClientBuilder 类 来自 CXF 发行版 用于配置和创建 javax ws rs client Client 这已经足够好了 我现在正在尝试编写使用的测试JA
  • 如何使用 gnu cp 命令将文件复制到多个目录

    是否可以使用 cp 命令将单个文件复制到多个目录 我尝试了以下方法 但没有成功 cp file1 foo bar cp file1 foo bar 我知道可以使用 for 循环或 find 但是可以使用 gnu cp 命令吗 你不能这样做c
  • 将字符串中第 N 次出现的字符替换为其他字符

    考虑a paste 1 10 collapse 这导致 a 1 2 3 4 5 6 7 8 9 10 我想替换每第 n 次 比如第 4 次 出现的 并将其替换为其他内容 比如 n 期望的输出是 1 2 3 4 n 5 6 7 8 n 9 1
  • PHP CodeIgniter 框架中的命名空间

    CodeIgniter 支持命名空间吗 如何让命名空间在 Codeigniter 中工作 实际上 您可以让命名空间与应用程序模型中的相对路径结合使用 此修改使加载模型变得更加容易 并且还允许您拥有接口 将其添加到 application c
  • Akka HTTP 连接池在几个小时后挂起

    我有一个 HTTP 连接池 在运行几个小时后挂起 private def createHttpPool host String SourceQueue HttpRequest Promise HttpResponse val pool Ht