这其实是doable。您可以像您所描述的那样进行选择,但仅限于特定格式:JSON、CSV、Parquet。
想象一下有一个data.json
文件输入so67315601
桶里eu-central-1
:
{
"a": "dataA",
"b": "dataB",
"c": "dataC",
"d": "dataD",
"e": "dataE"
}
首先,了解如何通过 S3 控制台选择字段。使用“对象操作”→“使用 S3 Select 查询”:
![enter image description here](https://i.stack.imgur.com/qLmNj.png)
AWS Java 开发工具包 1.x
以下是使用 AWS Java SDK 1.x 进行选择的代码:
@ExtendWith(S3.class)
class SelectTest {
@AWSClient(endpoint = Endpoint.class)
private AmazonS3 client;
@Test
void test() throws IOException {
// LINES: Each line in the input data contains a single JSON object
// DOCUMENT: A single JSON object can span multiple lines in the input
final JSONInput input = new JSONInput();
input.setType(JSONType.DOCUMENT);
// Configure input format and compression
final InputSerialization inputSerialization = new InputSerialization();
inputSerialization.setJson(input);
inputSerialization.setCompressionType(CompressionType.NONE);
// Configure output format
final OutputSerialization outputSerialization = new OutputSerialization();
outputSerialization.setJson(new JSONOutput());
// Build the request
final SelectObjectContentRequest request = new SelectObjectContentRequest();
request.setBucketName("so67315601");
request.setKey("data.json");
request.setExpression("SELECT s.a, s.b FROM s3object s LIMIT 5");
request.setExpressionType(ExpressionType.SQL);
request.setInputSerialization(inputSerialization);
request.setOutputSerialization(outputSerialization);
// Run the query
final SelectObjectContentResult result = client.selectObjectContent(request);
// Parse the results
final InputStream stream = result.getPayload().getRecordsInputStream();
IOUtils.copy(stream, System.out);
}
}
输出是:
{"a":"dataA","b":"dataB"}
AWS Java 开发工具包 2.x
AWS Java SDK 2.x 的代码更加狡猾。参考这张票了解更多信息。
@ExtendWith(S3.class)
class SelectTest {
@AWSClient(endpoint = Endpoint.class)
private S3AsyncClient client;
@Test
void test() throws Exception {
final InputSerialization inputSerialization = InputSerialization
.builder()
.json(JSONInput.builder().type(JSONType.DOCUMENT).build())
.compressionType(CompressionType.NONE)
.build();
final OutputSerialization outputSerialization = OutputSerialization.builder()
.json(JSONOutput.builder().build())
.build();
final SelectObjectContentRequest select = SelectObjectContentRequest.builder()
.bucket("so67315601")
.key("data.json")
.expression("SELECT s.a, s.b FROM s3object s LIMIT 5")
.expressionType(ExpressionType.SQL)
.inputSerialization(inputSerialization)
.outputSerialization(outputSerialization)
.build();
final TestHandler handler = new TestHandler();
client.selectObjectContent(select, handler).get();
RecordsEvent response = (RecordsEvent) handler.receivedEvents.stream()
.filter(e -> e.sdkEventType() == SelectObjectContentEventStream.EventType.RECORDS)
.findFirst()
.orElse(null);
System.out.println(response.payload().asUtf8String());
}
private static class TestHandler implements SelectObjectContentResponseHandler {
private SelectObjectContentResponse response;
private List<SelectObjectContentEventStream> receivedEvents = new ArrayList<>();
private Throwable exception;
@Override
public void responseReceived(SelectObjectContentResponse response) {
this.response = response;
}
@Override
public void onEventStream(SdkPublisher<SelectObjectContentEventStream> publisher) {
publisher.subscribe(receivedEvents::add);
}
@Override
public void exceptionOccurred(Throwable throwable) {
exception = throwable;
}
@Override
public void complete() {
}
}
}
如您所见,可以通过编程方式进行 S3 选择!
你可能想知道那些是什么@AWSClient
and @ExtendWith( S3.class )
?
这是一个小型库,用于在测试中注入 AWS 客户端,名为aws-junit5。这将大大简化您的测试。我是作者。用途非常简单——在你的下一个项目中尝试一下!