如何处理来自 S3 的大文件并在 Spring Batch 中使用它

2024-04-10

我有一个 CSV 文件,其中包含数百万条记录,大小约为 2GB。我的用例是从 S3 读取 CSV 文件并对其进行处理。请在下面找到我的代码:

在下面的代码中,我从 S3 存储桶读取文件并使用inputStream直接在 Spring 批处理中平面文件项读取器 reader.setResource(new InputStreamResource(inputStream));

根据此实现,我在内存中保存 2GB 内容并对其进行处理,这不是一种有效的方法 - 有人可以建议从 S3 存储桶中读取大文件并在 S3 存储桶中处理它的有效方法是什么吗?春季批次。

提前感谢您的帮助!谢谢。

@Component
public class GetFileFromS3 {

    public S3ObjectInputStream dowloadFile(String keyName, String bucketName, String region) {
        try {
            AmazonS3 s3Client = AmazonS3ClientBuilder.standard().withClientConfiguration(new ClientConfiguration())
                    .withRegion(region).build();

            S3Object s3object = s3Client.getObject(bucketName, keyName);
            return s3object.getObjectContent();
        } catch (AmazonServiceException e) {
            e.printStackTrace();
        }
        return null;
    }

}




public class SpringBatch {

    @Autowired
    private GetFileFromS3 getFileFromS3;


 @Bean(name = "csvFile")
    public Step step1() {
        return stepBuilderFactory.get("step1").<Employee, Employee>chunk(10)
                .reader(reader())
                .processor(processor())
                .writer(writer())
                .build();
    }

    @Bean
    public FlatFileItemReader<Employee> reader() {
        S3ObjectInputStream inputStream = getFileFromS3.dowloadFile("employee.csv", "testBucket", "us-east-1");
        FlatFileItemReader<Employee> reader = new FlatFileItemReader<Employee>();
        reader.setResource(new InputStreamResource(inputStream));
        reader.setLinesToSkip(1);
        reader.setLineMapper(new DefaultLineMapper() {
            {
                setLineTokenizer(new DelimitedLineTokenizer() {
                    {
                        setNames(Employee.fields());
                    }
                });
                setFieldSetMapper(new BeanWrapperFieldSetMapper<Employee>() {
                    {
                        setTargetType(Employee.class);
                    }
                });
            }
        });
        return reader;
    }

    @Bean
    public ItemProcessor<Employee, Employee> processor() {
        return new ItemProcessor();
    }

    @Bean
    public ItemWriter<Employee> writer() {
        return new ItemWriter<Event>();
    }

    }

利用ResourceLoader,我们可以像其他资源一样在ItemReader中读取S3中的文件。这将有助于以块的形式读取 S3 中的文件,而不是将整个文件加载到内存中。

随着依赖注入ResourceLoader and AmazonS3 client,已更改阅读器配置如下:

替换值sourceBucket and sourceObjectPrefix如所须。

@Autowired
private ResourceLoader resourceLoader;

@Autowired
private AmazonS3 amazonS3Client;

// READER
@Bean(destroyMethod="")
@StepScope
public SynchronizedItemStreamReader<Employee> employeeDataReader() {
    SynchronizedItemStreamReader synchronizedItemStreamReader = new SynchronizedItemStreamReader();
    List<Resource> resourceList = new ArrayList<>();
    String sourceBucket = yourBucketName;
    String sourceObjectPrefix = yourSourceObjectPrefix;
    log.info("sourceObjectPrefix::"+sourceObjectPrefix);
    ListObjectsRequest listObjectsRequest = new ListObjectsRequest()
            .withBucketName(sourceBucket)
            .withPrefix(sourceObjectPrefix);
    ObjectListing sourceObjectsListing;
    do{
        sourceObjectsListing = amazonS3Client.listObjects(listObjectsRequest);
        for (S3ObjectSummary sourceFile : sourceObjectsListing.getObjectSummaries()){

            if(!(sourceFile.getSize() > 0)
                    || (!sourceFile.getKey().endsWith(DOT.concat("csv")))
            ){
                // Skip if file is empty (or) file extension is not "csv"
                continue;
            }
            log.info("Reading "+sourceFile.getKey());
            resourceList.add(resourceLoader.getResource("s3://".concat(sourceBucket).concat("/")
                    .concat(sourceFile.getKey())));
        }
        listObjectsRequest.setMarker(sourceObjectsListing.getNextMarker());
    }while(sourceObjectsListing.isTruncated());

    Resource[] resources = resourceList.toArray(new Resource[resourceList.size()]);
    MultiResourceItemReader<Employee> multiResourceItemReader = new MultiResourceItemReader<>();
    multiResourceItemReader.setName("employee-multiResource-Reader");
    multiResourceItemReader.setResources(resources);
    multiResourceItemReader.setDelegate(employeeFileItemReader());
    synchronizedItemStreamReader.setDelegate(multiResourceItemReader);
    return synchronizedItemStreamReader;
}

@Bean
@StepScope
public FlatFileItemReader<Employee> employeeFileItemReader()
{
    FlatFileItemReader<Employee> reader = new FlatFileItemReader<Employee>();
    reader.setLinesToSkip(1);
    reader.setLineMapper(new DefaultLineMapper() {
        {
            setLineTokenizer(new DelimitedLineTokenizer() {
                {
                    setNames(Employee.fields());
                }
            });
            setFieldSetMapper(new BeanWrapperFieldSetMapper<Employee>() {
                {
                    setTargetType(Employee.class);
                }
            });
        }
    });
    return reader;
}

以 MultiResourceItemReader 为例。即使您正在查找的特定 S3 路径中有多个 CSV 文件,这也可以工作。

如果只处理某个位置的一个 CSV 文件,它也可以隐式地使用Resources[] resources包含一个条目。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何处理来自 S3 的大文件并在 Spring Batch 中使用它 的相关文章

  • 如何将 javax.persistence.Column 定义为 Unsigned TINYINT?

    我正在基于 MySQL 数据库中的现有表创建 Java 持久性实体 Bean 使用 NetBeans IDE 8 0 1 我在这个表中遇到了一个字段 其类型为 无符号 TINYINT 3 我发现可以执行以下操作将列的类型定义为 unsign
  • 通过SOCKS代理连接Kafka

    我有一个在 AWS 上运行的 Kafka 集群 我想用标准连接到集群卡夫卡控制台消费者从我的应用程序服务器 应用程序服务器可以通过 SOCKS 代理访问互联网 无需身份验证 如何告诉 Kafka 客户端通过代理进行连接 我尝试了很多事情 包
  • 如何在 Firebase 远程配置中从 JSON 获取值

    我是 Android 应用开发和 Firebase 的新手 我想知道如何获取存储在 Firebase 远程配置中的 JSONArray 文件中的值 String 和 Int 我使用 Firebase Remote Config 的最终目标是
  • 使用 Ant 将非代码资源添加到 jar 文件

    我正在将 java 应用程序打包成 jar 文件 我正在使用 ant 和 eclipse 我实际上需要在 jar 中直接在根文件夹下包含几个单独的非代码文件 xml 和 txt 文件 而不是与代码位于同一位置 我正在尝试使用includes
  • 不同类型的数组

    是否可以有一个包含两种不同类型数据的数组 我想要一个包含双精度型和字符串的数组 我尝试过 ArrayList
  • 在 Wildfly 中与 war 部署共享 util jar 文件

    假设我有一个名为 util jar 的 jar 文件 该 jar 文件主要包含 JPA 实体和一些 util 类 无 EJB 如何使这个 jar 可用于 Wildfly 中部署的所有 war 无需将 jar 放置在 war 的 WEB IN
  • Integer.parseInt("0x1F60A") 以 NumberformatException 结束

    我尝试从数据库中获取长字符串内的表情符号代码 格式如下 0x1F60A 所以我可以访问代码 但它将是String 起初 我尝试通过执行以下操作来转换变量tv setText beforeEmo getEmijoByUnicode int e
  • 是否可以使用 Flying Saucer (XHTML-Renderer) 将 css 解析为类路径资源?

    我正在尝试将资源打包到 jar 中 但我无法让 Flying Saucer 在类路径上找到 css 我无法轻松构建 URL 来无缝解决此问题 https stackoverflow com questions 861500 url to l
  • 大数据使用什么数据结构

    我有一个包含一百万行的 Excel 工作表 每行有 100 列 每行代表一个具有 100 个属性的类的实例 列值是这些属性的值 哪种数据结构最适合在这里使用来存储数百万个数据实例 Thanks 这实际上取决于您需要如何访问这些数据以及您想要
  • Kotlin 未解决的参考:CLI 上 gradle 的 println

    放一个printlnkotlin 函数返回之前的语句会崩溃 堆栈跟踪 thufir dur NetBeansProjects kotlin thufir dur NetBeansProjects kotlin gradle clean bu
  • Spring Security SAML2 使用 G Suite 作为 Idp

    我正在尝试使用 Spring Security 5 3 3 RELEASE 来处理 Spring Boot 应用程序中的 SAML2 身份验证 Spring Boot 应用程序将成为 SP G Suite 将成为 IDP 在我的 Maven
  • 如何检测 Java 字符串中的 unicode 字符?

    假设我有一个包含 的字符串 我如何找到所有这些 un icode 字符 我应该测试他们的代码吗 我该怎么做呢 例如 给定字符串 A X 我想将其转换为 AYXY 我想对其他 unicode 字符做同样的事情 并且我不想将它们存储在某种翻译映
  • 如何避免 ArrayIndexOutOfBoundsException 或 IndexOutOfBoundsException? [复制]

    这个问题在这里已经有答案了 如果你的问题是我得到了java lang ArrayIndexOutOfBoundsException在我的代码中 我不明白为什么会发生这种情况 这意味着什么以及如何避免它 这应该是最全面的典范 https me
  • 如何在 Spring 属性中进行算术运算?

  • HashMap 值需要不可变吗?

    我知道 HashMap 中的键需要是不可变的 或者至少确保它们的哈希码 hashCode 不会改变或与另一个具有不同状态的对象发生冲突 但是 HashMap中存储的值是否需要与上面相同 为什么或者为什么不 这个想法是能够改变值 例如在其上调
  • 返回 Java 8 中的通用函数接口

    我想写一种函数工厂 它应该是一个函数 以不同的策略作为参数调用一次 它应该返回一个函数 该函数根据参数选择其中一种策略 该参数将由谓词实现 嗯 最好看看condition3为了更好的理解 问题是 它没有编译 我认为因为编译器无法弄清楚函数式
  • 使用按钮作为列表的渲染器

    我想使用一个更复杂的渲染器 其中包含列表的多个组件 更准确地说 类似于this https stackoverflow com questions 10840498 java swing 1 6 textinput like firefox
  • 如何重新启动死线程? [复制]

    这个问题在这里已经有答案了 有哪些不同的可能性可以带来死线程回到可运行状态 如果您查看线程生命周期图像 就会发现一旦线程终止 您就无法返回到新位置 So 没有办法将死线程恢复到可运行状态 相反 您应该创建一个新的 Thread 实例
  • Java中HashMap和ArrayList的区别?

    在爪哇 ArrayList and HashMap被用作集合 但我不明白我们应该在哪些情况下使用ArrayList以及使用时间HashMap 他们两者之间的主要区别是什么 您具体询问的是 ArrayList 和 HashMap 但我认为要完
  • 泛型、数组和 ClassCastException

    我想这里一定发生了一些我不知道的微妙事情 考虑以下 public class Foo

随机推荐