我有一个 CSV 文件,其中包含数百万条记录,大小约为 2GB。我的用例是从 S3 读取 CSV 文件并对其进行处理。请在下面找到我的代码:
在下面的代码中,我从 S3 存储桶读取文件并使用inputStream
直接在 Spring 批处理中平面文件项读取器 reader.setResource(new InputStreamResource(inputStream));
根据此实现,我在内存中保存 2GB 内容并对其进行处理,这不是一种有效的方法 - 有人可以建议从 S3 存储桶中读取大文件并在 S3 存储桶中处理它的有效方法是什么吗?春季批次。
提前感谢您的帮助!谢谢。
@Component
public class GetFileFromS3 {
public S3ObjectInputStream dowloadFile(String keyName, String bucketName, String region) {
try {
AmazonS3 s3Client = AmazonS3ClientBuilder.standard().withClientConfiguration(new ClientConfiguration())
.withRegion(region).build();
S3Object s3object = s3Client.getObject(bucketName, keyName);
return s3object.getObjectContent();
} catch (AmazonServiceException e) {
e.printStackTrace();
}
return null;
}
}
public class SpringBatch {
@Autowired
private GetFileFromS3 getFileFromS3;
@Bean(name = "csvFile")
public Step step1() {
return stepBuilderFactory.get("step1").<Employee, Employee>chunk(10)
.reader(reader())
.processor(processor())
.writer(writer())
.build();
}
@Bean
public FlatFileItemReader<Employee> reader() {
S3ObjectInputStream inputStream = getFileFromS3.dowloadFile("employee.csv", "testBucket", "us-east-1");
FlatFileItemReader<Employee> reader = new FlatFileItemReader<Employee>();
reader.setResource(new InputStreamResource(inputStream));
reader.setLinesToSkip(1);
reader.setLineMapper(new DefaultLineMapper() {
{
setLineTokenizer(new DelimitedLineTokenizer() {
{
setNames(Employee.fields());
}
});
setFieldSetMapper(new BeanWrapperFieldSetMapper<Employee>() {
{
setTargetType(Employee.class);
}
});
}
});
return reader;
}
@Bean
public ItemProcessor<Employee, Employee> processor() {
return new ItemProcessor();
}
@Bean
public ItemWriter<Employee> writer() {
return new ItemWriter<Event>();
}
}
利用ResourceLoader,我们可以像其他资源一样在ItemReader中读取S3中的文件。这将有助于以块的形式读取 S3 中的文件,而不是将整个文件加载到内存中。
随着依赖注入ResourceLoader
and AmazonS3 client
,已更改阅读器配置如下:
替换值sourceBucket
and sourceObjectPrefix
如所须。
@Autowired
private ResourceLoader resourceLoader;
@Autowired
private AmazonS3 amazonS3Client;
// READER
@Bean(destroyMethod="")
@StepScope
public SynchronizedItemStreamReader<Employee> employeeDataReader() {
SynchronizedItemStreamReader synchronizedItemStreamReader = new SynchronizedItemStreamReader();
List<Resource> resourceList = new ArrayList<>();
String sourceBucket = yourBucketName;
String sourceObjectPrefix = yourSourceObjectPrefix;
log.info("sourceObjectPrefix::"+sourceObjectPrefix);
ListObjectsRequest listObjectsRequest = new ListObjectsRequest()
.withBucketName(sourceBucket)
.withPrefix(sourceObjectPrefix);
ObjectListing sourceObjectsListing;
do{
sourceObjectsListing = amazonS3Client.listObjects(listObjectsRequest);
for (S3ObjectSummary sourceFile : sourceObjectsListing.getObjectSummaries()){
if(!(sourceFile.getSize() > 0)
|| (!sourceFile.getKey().endsWith(DOT.concat("csv")))
){
// Skip if file is empty (or) file extension is not "csv"
continue;
}
log.info("Reading "+sourceFile.getKey());
resourceList.add(resourceLoader.getResource("s3://".concat(sourceBucket).concat("/")
.concat(sourceFile.getKey())));
}
listObjectsRequest.setMarker(sourceObjectsListing.getNextMarker());
}while(sourceObjectsListing.isTruncated());
Resource[] resources = resourceList.toArray(new Resource[resourceList.size()]);
MultiResourceItemReader<Employee> multiResourceItemReader = new MultiResourceItemReader<>();
multiResourceItemReader.setName("employee-multiResource-Reader");
multiResourceItemReader.setResources(resources);
multiResourceItemReader.setDelegate(employeeFileItemReader());
synchronizedItemStreamReader.setDelegate(multiResourceItemReader);
return synchronizedItemStreamReader;
}
@Bean
@StepScope
public FlatFileItemReader<Employee> employeeFileItemReader()
{
FlatFileItemReader<Employee> reader = new FlatFileItemReader<Employee>();
reader.setLinesToSkip(1);
reader.setLineMapper(new DefaultLineMapper() {
{
setLineTokenizer(new DelimitedLineTokenizer() {
{
setNames(Employee.fields());
}
});
setFieldSetMapper(new BeanWrapperFieldSetMapper<Employee>() {
{
setTargetType(Employee.class);
}
});
}
});
return reader;
}
以 MultiResourceItemReader 为例。即使您正在查找的特定 S3 路径中有多个 CSV 文件,这也可以工作。
如果只处理某个位置的一个 CSV 文件,它也可以隐式地使用Resources[] resources
包含一个条目。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)