我正在阅读数百个大型 (6GB) gzip 日志文件GZIPInputStream
是我想解析的。假设每一项的格式如下:
Start of log entry 1
...some log details
...some log details
...some log details
Start of log entry 2
...some log details
...some log details
...some log details
Start of log entry 3
...some log details
...some log details
...some log details
我正在逐行流式传输 gzip 压缩文件内容BufferedReader.lines()
。该流看起来像:
[
"Start of log entry 1",
" ...some log details",
" ...some log details",
" ...some log details",
"Start of log entry 2",
" ...some log details",
" ...some log details",
" ...some log details",
"Start of log entry 2",
" ...some log details",
" ...some log details",
" ...some log details",
]
每个日志条目的开始可以通过谓词来识别:line -> line.startsWith("Start of log entry")
。我想改变这个Stream<String>
into a Stream<Stream<String>>
根据这个谓词。每个“子流”应该在谓词为真时开始,并在谓词为假时收集行,直到下一次谓词为真,这表示该子流的结束和下一个子流的开始。结果如下:
[
[
"Start of log entry 1",
" ...some log details",
" ...some log details",
" ...some log details",
],
[
"Start of log entry 2",
" ...some log details",
" ...some log details",
" ...some log details",
],
[
"Start of log entry 3",
" ...some log details",
" ...some log details",
" ...some log details",
],
]
从那里,我可以获取每个子流并将其映射到new LogEntry(Stream<String> logLines)
以便将相关日志行聚合到LogEntry
对象。
这是一个大概的想法:
import java.io.*;
import java.nio.charset.*;
import java.util.*;
import java.util.function.*;
import java.util.stream.*;
import static java.lang.System.out;
class Untitled {
static final String input =
"Start of log entry 1\n" +
" ...some log details\n" +
" ...some log details\n" +
" ...some log details\n" +
"Start of log entry 2\n" +
" ...some log details\n" +
" ...some log details\n" +
" ...some log details\n" +
"Start of log entry 3\n" +
" ...some log details\n" +
" ...some log details\n" +
" ...some log details";
static final Predicate<String> isLogEntryStart = line -> line.startsWith("Start of log entry");
public static void main(String[] args) throws Exception {
try (ByteArrayInputStream gzipInputStream
= new ByteArrayInputStream(input.getBytes(StandardCharsets.UTF_8)); // mock for fileInputStream based gzipInputStream
InputStreamReader inputStreamReader = new InputStreamReader( gzipInputStream );
BufferedReader reader = new BufferedReader( inputStreamReader )) {
reader.lines()
.splitByPredicate(isLogEntryStart) // <--- What witchcraft should go here?
.map(LogEntry::new)
.forEach(out::println);
}
}
}
约束:我有数百个这样的大文件要并行处理(但每个文件只有一个顺序流),这使得将它们完全加载到内存中(例如,通过将它们存储为List<String> lines
)是不可行的。
任何帮助表示赞赏!