如何将 futures::Stream 写入磁盘而不先将其完全存储在内存中?

2024-04-19

这里有一个使用 Rusoto S3 下载文件的示例:如何将从 S3 使用 Rusoto 下载的文件保存到我的硬盘? https://stackoverflow.com/questions/51287360/how-to-save-a-file-downloaded-from-s3-with-rusoto-to-my-hard-drive

问题在于,它看起来像是将整个文件下载到内存中,然后将其写入磁盘,因为它使用write_all https://doc.rust-lang.org/std/io/trait.Write.html#method.write_all方法接受字节数组,而不是流。我怎样才能使用StreamingBody https://rusoto.github.io/rusoto/rusoto_s3/struct.StreamingBody.html,它实现了futures::Stream https://docs.rs/futures/0.1/futures/stream/trait.Stream.html将文件流式传输到磁盘?


Since StreamingBody实施Stream<Item = Vec<u8>, Error = Error>,我们可以构建一个MCVE https://stackoverflow.com/help/mcve这代表着:

extern crate futures; // 0.1.25

use futures::{prelude::*, stream};

type Error = Box<std::error::Error>;

fn streaming_body() -> impl Stream<Item = Vec<u8>, Error = Error> {
    const DUMMY_DATA: &[&[u8]] = &[b"0123", b"4567", b"89AB", b"CDEF"];
    let iter_of_owned_bytes = DUMMY_DATA.iter().map(|&b| b.to_owned());
    stream::iter_ok(iter_of_owned_bytes)
}

然后我们可以以某种方式获得“流媒体主体”并使用Stream::for_each https://docs.rs/futures/0.1.25/futures/stream/trait.Stream.html#method.for_each处理中的每个元素Stream。在这里,我们只需调用write_all提供一些输出位置:

use std::{fs::File, io::Write};

fn save_to_disk(mut file: impl Write) -> impl Future<Item = (), Error = Error> {
    streaming_body().for_each(move |chunk| file.write_all(&chunk).map_err(Into::into))
}

然后我们可以编写一些测试主要内容:

fn main() {
    let mut file = Vec::new();

    {
        let fut = save_to_disk(&mut file);
        fut.wait().expect("Could not drive future");
    }

    assert_eq!(file, b"0123456789ABCDEF");
}

关于这个简单实现的质量的重要注意事项:

  1. 致电给write_all可能会阻塞,您不应该在异步程序中这样做。最好将阻塞工作交给线程池。

  2. 的用法Future::wait强制线程阻塞,直到未来完成,这对于测试来说非常有用,但对于您的实际用例可能不正确。

也可以看看:

  • 在 future-rs 中封装阻塞 I/O 的最佳方法是什么? https://stackoverflow.com/q/41932137/155423
  • 如何在稳定的 Rust 中同步返回在异步 Future 中计算的值? https://stackoverflow.com/q/52521201/155423
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何将 futures::Stream 写入磁盘而不先将其完全存储在内存中? 的相关文章

随机推荐

  • 从日期中减去时间 - 时刻 js

    例如我有这个日期时间 01 20 00 06 26 2014 我想减去这样的时间 00 03 15 之后我想将结果格式化为这样 3 hours and 15 minutes earlier 我怎样才能做到这一点使用moment js edi
  • useReducer Action 调度两次

    Scenario 我有一个返回操作的自定义挂钩 父组件 Container 利用自定义钩子并将操作作为 prop 传递给子组件 Problem 当从子组件执行操作时 实际调度会发生两次 现在 如果子级直接使用钩子并调用操作 则调度仅发生一次
  • 当我不知道它是否是临时的时,C++ 返回类型

    假设Foo是一个相当大的数据结构 我应该怎样写一个const返回实例的虚函数Foo 如果我不知道继承的类是否会存储Foo内部 因此 允许通过引用返回 如果我无法在内部存储它 我的理解是我无法返回const引用它 因为它将是临时的 它是否正确
  • 如何使用 Laravel + JavaScript 创建搜索过滤器?

    我最近创建了一个 JavaScript 过滤器来过滤产品表中的数据 我有 5 个字段可以输入搜索 它们是 描述 型号 经销商和库存 我将表与另一个视图中的产品分开 并将字段保留在索引中 我需要该表返回我在字段中输入的值 我举了一个用 描述
  • WP7 检查互联网是否可用

    我的应用程序 WP7 未被接受 因为如果互联网不可用 它无法加载 我寻找一种方法来检查它并找到了这个命令 NetworkInterface GetIsNetworkAvailable 但它无法在模拟器上运行 而且我没有任何设备来测试它 有人
  • 在 git-svn 中克隆主干后克隆分支的最佳方法是什么?

    给定一个包含许多分支的大型 Subversion 存储库 我想开始使用git svn通过克隆trunk首先 然后添加特定分支 我看到至少三种方法可以做到这一点 但是其中任何一种都是 官方的 还是有最好的方法 假设以下布局 https svn
  • Delphi 6:在缺少抽象类方法时强制编译器错误?

    我使用的是 Delphi Pro 6 现在 了解类是否缺少基类抽象方法的唯一方法是等待 IDE 发出 包含抽象方法 base class 抽象方法名称 警告或在尝试调用缺少的方法时等待运行时抽象错误方法 前者是不够的 因为它只查找当前项目中
  • 如何在 log4j 中启用包级别日志记录

    谁能告诉我 log4j 中的包级别日志记录是什么 以及如何实现这一点 今天我的面试问题无法回答 即使我在谷歌中也没有找到好的解决方案 太感谢了 包级别日志记录是 log4j 的标准日志记录 使用 log4j 配置 您可以指定包和关联的级别
  • 将 std::wstring 转换为 int

    我认为这非常简单 但我无法让它发挥作用 我只是想将 std wstring 转换为 int 到目前为止我已经尝试了两种方法 第一种是将 C 方法与 atoi 一起使用 如下所示 int ConvertedInteger atoi OrigW
  • 如何让传单地图画布具有 100% 的高度?

    我的传单画布目前如下所示 高度为 700 像素 不过我希望它的高度为 100 以便占据整个空白区域 高度 100 在地图画布的 CSS 属性中不起作用 我找到了一些解决方案 但它们只适用于谷歌地图 有没有人有解决方案 即使这只是一个解决方法
  • 在 Matplotlib 3D 绘图中获取观察/相机角度?

    当我用鼠标旋转 Matplotlib 3D 图时 如何保存视角 相机位置 并在下次运行脚本时使用这些值以编程方式设置视角 TL DR 视角存储在图形的轴对象中 名称为elev and azim 并且视图可以设置为plt gca view i
  • jQuery AJAX 参数未传递给 MVC

    我有点陷入可能是常见的情况 但找不到太多解决方案 我将单个 int 参数传递给 MVC 控制器方法 期望返回 Json 响应 问题是 该参数虽然在客户端填充 但在服务器端无法识别并被解释为空 这是代码 function getBatches
  • 无法在react-native中获取iOS推送通知设备令牌

    我提到这个问题 https stackoverflow com questions 35387227 get device token with react native获取设备令牌以便将推送通知发送到我的应用程序 我使用创建了我的应用程序
  • 如何在 Swift 中获取由整数表示的 Unicode 代码点?

    所以我知道如何将字符串转换为utf8格式 如下所示 for character in strings utf8 for example A will converted to 65 var utf8Value character 我已经阅读
  • “栅栏已经激活——来不及添加写入”

    下面的错误信息是什么意思 栅栏已经激活 来不及添加写入 以下是如何获取它的示例 环境 Mac OS X http en wikipedia org wiki Mac OS X Lion 流星0 3 8 项目创建 meteor create
  • 熊猫到D3。将数据帧序列化为 JSON

    我有一个包含以下列且没有重复项的 DataFrame region type name value 可以看作是一个层次结构 如下所示 grouped df groupby region type name 我想将此层次结构序列化为 JSON
  • 任何无需 GUI/X 会话即可使用 GreaseMonkey 脚本运行 Firefox 的方法

    我需要为第三方网站构建一个小型 监控 抓取工具 这是一个外部网站 其中包含有关我们访问者的统计信息 不幸的是 这个网站很难通过正常的 wget 机制 因为它使用了大量复杂的 JS 其中一部分是由 GWT 生成的 所以我的解决方法是创建一个
  • 如何让 celery Worker 停止接收新任务 (Kubernetes)

    因此 我们有一个 kubernetes 集群 运行一些带有 celery 工作线程的 pod 我们使用 python3 6 来运行这些工作程序 celery 版本是 3 1 2 我知道 真的很旧 我们正在努力升级它 我们还设置了一些自动缩放
  • git checkout 区别 git checkout origin/ 和 git checkout ?

    当我做git checkout origin bugfix NTP 183 datefnsgit 显示 Note checking out origin bugfix NTP 183 datefns You are in detached
  • 如何将 futures::Stream 写入磁盘而不先将其完全存储在内存中?

    这里有一个使用 Rusoto S3 下载文件的示例 如何将从 S3 使用 Rusoto 下载的文件保存到我的硬盘 https stackoverflow com questions 51287360 how to save a file d