如何使用 aws-java-sdk 从 S3 中逐块读取文件

2024-01-09

我正在尝试从 S3 将大文件读取成块,而不切割任何行以进行并行处理。

让我通过例子来解释一下: S3上有1G大小的文件。我想将此文件分成 64 MB 的块。我可以很容易地做到这一点:

S3Object s3object = s3.getObject(new GetObjectRequest(bucketName, key));

InputStream stream = s3object.getObjectContent();

byte[] content = new byte[64*1024*1024];

while (stream.read(content)  != -1) {

//process content here 

}

但块的问题是它可能有 100 行完整的行和 1 行不完整的行。但我无法处理不完整的行并且不想丢弃它。

有什么办法可以处理这种情况吗?表示所有卡盘都没有偏线。


我通常的做法(InputStream -> BufferedReader.lines()-> 批量行 ->CompletableFuture)在这里不起作用,因为底层S3ObjectInputStream对于大文件最终会超时。

所以我创建了一个新类S3InputStream,它不关心它打开多长时间,并使用短暂的 AWS SDK 调用按需读取字节块。您提供一个byte[]将被重复使用。new byte[1 << 24](16Mb) 看起来效果很好。

package org.harrison;

import java.io.IOException;
import java.io.InputStream;

import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.GetObjectRequest;

/**
 * An {@link InputStream} for S3 files that does not care how big the file is.
 *
 * @author stephen harrison
 */
public class S3InputStream extends InputStream {
    private static class LazyHolder {
        private static final AmazonS3 S3 = AmazonS3ClientBuilder.defaultClient();
    }

    private final String bucket;
    private final String file;
    private final byte[] buffer;
    private long lastByteOffset;

    private long offset = 0;
    private int next = 0;
    private int length = 0;

    public S3InputStream(final String bucket, final String file, final byte[] buffer) {
        this.bucket = bucket;
        this.file = file;
        this.buffer = buffer;
        this.lastByteOffset = LazyHolder.S3.getObjectMetadata(bucket, file).getContentLength() - 1;
    }

    @Override
    public int read() throws IOException {
        if (next >= length) {
            fill();

            if (length <= 0) {
                return -1;
            }

            next = 0;
        }

        if (next >= length) {
            return -1;
        }

        return buffer[this.next++];
    }

    public void fill() throws IOException {
        if (offset >= lastByteOffset) {
            length = -1;
        } else {
            try (final InputStream inputStream = s3Object()) {
                length = 0;
                int b;

                while ((b = inputStream.read()) != -1) {
                    buffer[length++] = (byte) b;
                }

                if (length > 0) {
                    offset += length;
                }
            }
        }
    }

    private InputStream s3Object() {
        final GetObjectRequest request = new GetObjectRequest(bucket, file).withRange(offset,
                offset + buffer.length - 1);

        return LazyHolder.S3.getObject(request).getObjectContent();
    }
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何使用 aws-java-sdk 从 S3 中逐块读取文件 的相关文章

随机推荐

  • VB.NET LINQ 按多列分组

    我有以下 LINQ 代码 该代码有语法错误 但我不知道如何修复它 Dim query From row In mainDatatable AsEnumerable the syntax Error is in the following l
  • 保持当前页面呈现,直到加载下一页

    我有一个基于 WebView loadUrl http www example com http www example com 当用户单击 URL 时 默认行为是立即显示空白页面 等待页面加载 然后显示此页面 我设法显示启动屏幕来代替空白
  • POSIX TIMER - 有多个计时器

    我试图在我的系统中有两个计时器用于两个不同的目的 但我不明白为什么它不起作用 有人可以帮助我吗 另外 处理程序代码是否应该是最低限度的 以便任务本身不会干扰滴答声 我还可以定义单独的处理程序吗 include
  • 如何在客户注册表中添加地址字段?

    我使用的是magento 1 6 我想在客户注册表上显示地址字段 我从 register phtml 中删除了以下几行 但它不起作用 所以我该怎么做 如果您使用的是 magento 1 6 或更高版本 并且只需在customers form
  • One-hot 编码多级列数据

    我有以下数据框 其中包含有关不同主题的特征的记录 ID Feature 1 A 1 B 2 A 1 A 3 B 3 B 1 C 2 C 3 D 我想获得另一个 聚合的 数据帧 其中每一行代表一个特定主题 并且有所有单热编码功能的详尽列表 I
  • Cordova 应用程序可以编译,但在运行时崩溃。如何获取错误报告?

    我有一个 Cordova 应用程序 编译时没有错误 然而 当我将应用程序加载到我的 Android 设备上时 它在启动时立即崩溃 当我不知道错误是什么时 我很难进行调试 问题 有没有办法从这次崩溃中获取错误报告 我通常使用 Chrome 来
  • 在哪里以及如何安装 ArcPy for Python 2.7?

    我检查过http www lfd uci edu gohlke pythonlibs http www lfd uci edu gohlke pythonlibs http help arcgis com en arcgisdesktop
  • 如何在 for 循环中追加 pandas 数据框中的行? [复制]

    这个问题在这里已经有答案了 我有以下 for 循环 for i in links data urllib2 urlopen str i read data json loads data data pd DataFrame data ite
  • 如何在 Vim 中进行类似于“grep -w”的全字搜索

    我如何进行全字搜索 例如grep w在 Vim 中 它只返回所寻找的字符串是整个单词而不是较大单词的一部分的行 grep w 仅选择包含构成整个单词的匹配项的行 这可以在 Vim 中完成吗
  • 数据结构:此类练习中 pop、push、dequeue、enqueue 的解释

    我正在努力熟悉这 4 个概念 所以如果我们有一个数组 15 34 23 32 15 5 我们的业务包括 pop push 30 enqueue 40 dequeue 100 pop 只会删除第一个数字 即 15 对吧 如果是 pop 20
  • 如何同时静默对 Rails 控制器操作的调用

    我已经弄清楚如何通过将操作方法 内的所有内容包装在 logger silence 块中来使操作的内容静音 但是 我仍然收到对日志文件中显示的操作的调用 I E Processing DashboardController update fo
  • 如何从 获取持续时间,如 int milli 和 float 秒?

    我正在尝试使用 chrono 库来设置计时器和持续时间 我希望能够拥有一个Duration frameStart 从应用程序启动 和一个Duration frameDelta 帧之间的时间 我需要能够得到frameDelta持续时间为毫秒和
  • 在 Xamarin.iOS 中创建可扩展的表格视图?

    我正在尝试使用创建可扩展的表格视图Xamarin iOS具有手风琴类型的功能 例如 最初会有一些行 点击任何行 单元格将扩展到表视图中的其他一些行 Thanks 嘿 我制作了一个可扩展表格视图的示例here https github com
  • 我可以使用 OpenCL 分配设备内存并在 CUDA 中使用指向内存的指针吗?

    假设我使用 OpenCL 来管理内存 以便 GPU CPU 之间的内存管理使用相同的代码 但我的计算使用优化的 CUDA 和 CPU 代码 不是 OpenCL 我仍然可以使用 OpenCL 设备内存指针并将它们传递给 CUDA 函数 内核吗
  • 学习正确使用VBO

    因此 我一直在尝试自学使用 VBO 以提高 OpenGL 项目的性能并学习比固定功能渲染更高级的东西 但我还没有找到太多像样的教程 到目前为止我发现的最好的是宋浩的教程 http www songho ca opengl gl vbo ht
  • 修改 beforeFind 回调中所需的 Containable 字段?

    在我的 CakePHP 1 2 5 应用程序中 我有一个Profile模型属于User模型 用户模型有一个username字段 并且当执行find 在 Profile 模型上 我希望始终自动检索User username也 我认为修改我的配
  • Angular2 中组件属性变化的可观察

    当在 Angular 2 中创建一个通过 Input 具有输入属性的组件时 如何从对该属性 Input 所做的更改中获取可观察值 不要与用户表单输入混淆 export class ExampleComponent implement OnC
  • 隐式解包的选项真的是可选的吗?

    在 Swift 4 0 中 以下代码无法编译 var str String func someFunc s inout String someFunc str 现在我想象str属于类型String 事实上 Swift 编译器似乎也同意 无法
  • 在 C# 中解密使用 RSA 在 iPhone 上加密的内容时遇到问题

    到目前为止 我已经花了两天时间研究这个问题 并梳理了我可以使用的所有资源 所以这是最后的手段 我有一个 X509 证书 其公钥已存储在 iPhone 的钥匙串中 此时仅限模拟器 在 ASP NET 方面 我已在证书存储区中使用私钥获取了证书
  • 如何使用 aws-java-sdk 从 S3 中逐块读取文件

    我正在尝试从 S3 将大文件读取成块 而不切割任何行以进行并行处理 让我通过例子来解释一下 S3上有1G大小的文件 我想将此文件分成 64 MB 的块 我可以很容易地做到这一点 S3Object s3object s3 getObject