Java 通过谓词将流拆分为流的流

2023-11-22

我正在阅读数百个大型 (6GB) gzip 日志文件GZIPInputStream是我想解析的。假设每一项的格式如下:

Start of log entry 1
    ...some log details
    ...some log details
    ...some log details
Start of log entry 2
    ...some log details
    ...some log details
    ...some log details
Start of log entry 3
    ...some log details
    ...some log details
    ...some log details

我正在逐行流式传输 gzip 压缩文件内容BufferedReader.lines()。该流看起来像:

[
    "Start of log entry 1",
    "    ...some log details",
    "    ...some log details",
    "    ...some log details",
    "Start of log entry 2",
    "    ...some log details",
    "    ...some log details",
    "    ...some log details",
    "Start of log entry 2",
    "    ...some log details",
    "    ...some log details",
    "    ...some log details",
]

每个日志条目的开始可以通过谓词来识别:line -> line.startsWith("Start of log entry")。我想改变这个Stream<String> into a Stream<Stream<String>>根据这个谓词。每个“子流”应该在谓词为真时开始,并在谓词为假时收集行,直到下一次谓词为真,这表示该子流的结束和下一个子流的开始。结果如下:

[
    [
        "Start of log entry 1",
        "    ...some log details",
        "    ...some log details",
        "    ...some log details",
    ],
    [
        "Start of log entry 2",
        "    ...some log details",
        "    ...some log details",
        "    ...some log details",
    ],
    [
        "Start of log entry 3",
        "    ...some log details",
        "    ...some log details",
        "    ...some log details",
    ],
]

从那里,我可以获取每个子流并将其映射到new LogEntry(Stream<String> logLines)以便将相关日志行聚合到LogEntry对象。

这是一个大概的想法:

import java.io.*;
import java.nio.charset.*;
import java.util.*;
import java.util.function.*;
import java.util.stream.*;

import static java.lang.System.out;

class Untitled {
    static final String input = 
        "Start of log entry 1\n" +
        "    ...some log details\n" +
        "    ...some log details\n" +
        "    ...some log details\n" +
        "Start of log entry 2\n" +
        "    ...some log details\n" +
        "    ...some log details\n" +
        "    ...some log details\n" +
        "Start of log entry 3\n" +
        "    ...some log details\n" +
        "    ...some log details\n" +
        "    ...some log details";

    static final Predicate<String> isLogEntryStart = line -> line.startsWith("Start of log entry"); 

    public static void main(String[] args) throws Exception {
        try (ByteArrayInputStream gzipInputStream
        = new ByteArrayInputStream(input.getBytes(StandardCharsets.UTF_8)); // mock for fileInputStream based gzipInputStream
             InputStreamReader inputStreamReader = new InputStreamReader( gzipInputStream ); 
             BufferedReader reader = new BufferedReader( inputStreamReader )) {

            reader.lines()
                .splitByPredicate(isLogEntryStart) // <--- What witchcraft should go here?
                .map(LogEntry::new)
                .forEach(out::println);
        }
    }
}

约束:我有数百个这样的大文件要并行处理(但每个文件只有一个顺序流),这使得将它们完全加载到内存中(例如,通过将它们存储为List<String> lines)是不可行的。

任何帮助表示赞赏!


Frederico 的答案可能是解决这个特定问题的最好方法。遵循他最后关于习俗的想法Spliterator,我将添加答案的改编版本类似的问题,我建议使用自定义迭代器来创建分块流。此方法也适用于不是由输入读取器创建的其他流。

public class StreamSplitter<T>
    implements Iterator<Stream<T>>
{
    private Iterator<T>  incoming;
    private Predicate<T> startOfNewEntry;
    private T            nextLine;

    public static <T> Stream<Stream<T>> streamOf(Stream<T> incoming, Predicate<T> startOfNewEntry)
    {
        Iterable<Stream<T>> iterable = () -> new StreamSplitter<>(incoming, startOfNewEntry);
        return StreamSupport.stream(iterable.spliterator(), false);
    }

    private StreamSplitter(Stream<T> stream, Predicate<T> startOfNewEntry)
    {
        this.incoming = stream.iterator();
        this.startOfNewEntry = startOfNewEntry;
        if (incoming.hasNext())
            nextLine = incoming.next();
    }

    @Override
    public boolean hasNext()
    {
        return nextLine != null;
    }

    @Override
    public Stream<T> next()
    {
        List<T> nextEntrysLines = new ArrayList<>();
        do
        {
            nextEntrysLines.add(nextLine);
        } while (incoming.hasNext()
                 && !startOfNewEntry.test((nextLine = incoming.next())));

        if (!startOfNewEntry.test(nextLine)) // incoming does not have next
            nextLine = null;

        return nextEntrysLines.stream();
    }
}

Example

public static void main(String[] args)
{
    Stream<String> flat = Stream.of("Start of log entry 1",
                                    "    ...some log details",
                                    "    ...some log details",
                                    "Start of log entry 2",
                                    "    ...some log details",
                                    "    ...some log details",
                                    "Start of log entry 3",
                                    "    ...some log details",
                                    "    ...some log details");

    StreamSplitter.streamOf(flat, line -> line.matches("Start of log entry.*"))
                  .forEach(logEntry -> {
                      System.out.println("------------------");
                      logEntry.forEach(System.out::println);
                  });
}

// Output
// ------------------
// Start of log entry 1
//     ...some log details
//     ...some log details
// ------------------
// Start of log entry 2
//     ...some log details
//     ...some log details
// ------------------
// Start of log entry 3
//     ...some log details
//     ...some log details

迭代器总是向前看一行。一旦该行成为新条目的开始,它就会将前一个条目包装在流中并将其返回为next。工厂方法streamOf将此迭代器转换为要在上面给出的示例中使用的流。

我将分割条件从正则表达式更改为Predicate,因此您可以借助多个正则表达式、if 条件等指定更复杂的条件。

请注意,我仅使用上面的示例数据对其进行了测试,因此我不知道它在更复杂、错误或空输入的情况下会如何表现。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Java 通过谓词将流拆分为流的流 的相关文章

  • 带有 Android 支持库 v7 的 Maven Android 插件

    我使用 maven android plugin 构建我的 android 应用程序 它依赖于 android 支持库 v4 和 v7 由于我没有找到如何从developer android com下载整个sdk 因此我无法使用maven
  • 如何在 Openfire 中使用 smack

    你好 我计划开发一个可以连接到 gtalk facebook 等的聊天客户端 我决定将 smack API 与 openfire 一起使用 但我需要很少的指导来了解如何将它与 openfire 服务器一起使用 openfire 是否提供了基
  • 将SQL数据引入jquery availabletag

    我正在尝试制作自动完成文本框 但如何将 SQL 数据包含到 jquery 可用标记并循环它 我无法根据以下代码执行该功能 任何帮助 将不胜感激 谢谢 这是我的预期输出 预期结果演示 http jsfiddle net VvETA 71 jq
  • 删除优先级队列的尾部元素

    如何删除优先级队列的尾部元素 我正在尝试使用优先级队列实现波束搜索 一旦优先级队列已满 我想删除最后一个元素 优先级最低的元素 Thanks 没有简单的方法 将元素从原始元素复制到新元素 最后一个除外 PriorityQueue remov
  • 埃拉托色尼筛法 - 实现返回一些非质数值?

    我用 Java 实现了埃拉托斯特尼筛法 通过伪代码 public static void sieveofEratosthenes int n boolean numArray numArray new boolean n for int i
  • Logback:SizeAndTimeBasedRollingPolicy 不遵守totalSizeCap

    我正在尝试以一种方式管理我的日志记录 一旦达到总累积大小限制或达到最大历史记录限制 我最旧的存档日志文件就会被删除 当使用SizeAndTimeBasedRollingPolicy在 Logback 1 1 7 中 滚动文件追加器将继续创建
  • 在 Struts 2 中传递 URL 参数而不使用查询字符串

    我想使用类似的 URL host ActionName 123 abc 而不是像这样传递查询字符串 host ActionName parm1 123 parm2 abc 我怎样才能在 Struts 2 中做到这一点 我按照下面的方法做了
  • Spring数据中的本机查询连接

    我有课 Entity public class User Id Long id String name ManyToMany List
  • 是否可以从 servlet 内部以编程方式设置请求上下文路径?

    这是一个特殊情况 我陷入了处理 企业 网络应用程序的困境 企业应用程序正在调用request getContext 并将其与另一个字符串进行比较 我发现我可以使用 getServletContext getContextPath 获取 se
  • 添加到列表时有没有办法避免循环?

    我想知道这样的代码 List
  • Eclipse - 安装新的 JRE (Java SE 8 1.8.0)

    我正在尝试安装 Java 8 到目前为止我所做的 安装最新版本的 Eclipse 下载并安装 Java SE 运行时环境 8http www oracle com technetwork java javase downloads jre8
  • 在 Java 中通过 XSLT 分解 XML

    我需要转换具有嵌套 分层 表单结构的大型 XML 文件
  • 如何删除日期对象的亚秒部分

    当 SQL 数据类型为时间戳时 java util Date 存储为 2010 09 03 15 33 22 246 如何在存储记录之前将亚秒设置为零 例如 在本例中为 246 最简单的方法是这样的 long time date getTi
  • 寻找局部最小值

    下面的代码正确地找到了数组的局部最大值 但未能找到局部最小值 我已经进行了网络搜索 以找到找到最小值的最佳方法 并且根据这些搜索 我认为我正在使用下面的正确方法 但是 在几天的时间里多次检查每一行之后 下面的代码中有一些我仍然没有看到的错误
  • 我可以限制分布式应用程序发出的请求吗?

    我的应用程序发出 Web 服务请求 提供商处理的请求有最大速率 因此我需要限制它们 当应用程序在单个服务器上运行时 我曾经在应用程序级别执行此操作 一个对象跟踪到目前为止已发出的请求数量 并在当前请求超出允许的最大负载时等待 现在 我们正在
  • 如何处理 StaleElementReferenceException

    我正在为鼠标悬停工作 我想通过使用 for 循环单击每个链接来测试所有链接的工作条件 在我的程序中 迭代进行一次 而对于下一次迭代 它不起作用并显示 StaleElementReferenceException 如果需要 请修改代码 pub
  • 使用 Java https 上传到 Imgur v3 错误

    我目前正在尝试使用他们当前的 API v3 上传到 imgur 但是我不断收到错误 错误 javax net ssl SSLException 证书中的主机名不匹配 api imgur com imgur com OR imgur com
  • ECDH使用Android KeyStore生成私钥

    我正在尝试使用 Android KeyStore Provider 生成的私有文件在 Android 中实现 ECDH public byte ecdh PublicKey otherPubKey throws Exception try
  • 何时在 hibernate 中使用 DiscriminatorValue 注解

    在 hibernate 中使用 DiscriminatorValue 注释的最佳场景是什么以及何时 这两个链接最能帮助我理解继承概念 http docs oracle com javaee 6 tutorial doc bnbqn html
  • 基于 Spring Boot 的测试中的上下文层次结构

    我的 Spring Boot 应用程序是这样启动的 new SpringApplicationBuilder sources ParentCtxConfig class child ChildFirstCtxConfig class sib

随机推荐

  • 升级到 swift 2,cocoapods -.38.2 现在出现构建错误命令 /bin/sh 失败,退出代码 23

    我该如何诊断这个错误Command bin sh failed with exit code 23 sent 130971 bytes received 42 bytes 262026 00 bytes sec total size is
  • 如何将文件(docx、doc、pdf 或 json)发送到 fastapi 并在没有 UI(即 HTML)的情况下对其进行预测? [复制]

    这个问题在这里已经有答案了 如果您知道如何将文件发送到 FastAPI 服务器并在 predict 端点中访问它以使用我的模型进行预测 请帮助我 我已经使用 predict 端点部署了模型并完成uvicorn main app它已部署 但唯
  • 如何从通过 SSHExecuteOperator 推送的 Airflow XCom 检索值

    我有以下 DAG 和两个 SSHExecuteOperator 任务 第一个任务执行一个返回参数的存储过程 第二个任务需要此参数作为输入 请解释一下如何从任务 1 中推送的 XCom 中提取值 以便在任务 2 中使用它 from airfl
  • Python API 设计中的重载(或替代方案)

    我有一个大型的现有程序库 当前具有 NET 绑定 并且我正在考虑编写 Python 绑定 现有的 API 广泛使用基于签名的重载 所以 我有大量静态函数 例如 Circle p1 p2 p3 Creates a circle through
  • Intellij 新的 Java 类打开与平常不同的对话框

    通过创建新的java类时新建 gt Java 类使用时智能集成开发环境 一个随机对话框开始出现 我找不到如何禁用它 这是开始显示的奇怪对话框 有点像向导 单击帮助按钮获取我的帮助There is no help for this dialo
  • 我可以覆盖用 C 编写的 Ruby 方法吗?

    是否可以覆盖 Ruby 本身的方法 例如rb error frozen 用 C 语言编写 带有 Ruby 代码 背景 我想知道当修改冻结对象时 是否可以让 Ruby 仅记录警告 而不引发异常 这样 我可以记录各种状态修改 而不是在第一个发生
  • 从 GitHub 项目导出可用的 .jar 文件

    我对 java 和 eclipse 相当陌生 我想从这个 github 项目中获取 jar 文件 https github com nikkiii omegle api java 我已经尝试在 Eclipse 中这样做 但我无法将项目作为
  • docker 未考虑 docker-compose.yml 中指定的资源限制

    我正在尝试设置资源限制docker compose yml file 这里是 version 3 7 services postgres build docker postgres container name postgres ports
  • 在Python中替换unicode字符串中的非ascii字符

    如何在Python中替换unicode字符串中的非ascii字符 这是我针对给定输入观察的输出 音乐 gt 音乐 纸箱 gt 纸箱 卡诺 gt 卡诺 也许有一个字典 其中 是键 a 是值 如果您只想将重音字符降级为非重音字符 gt gt g
  • 在 Raku 中使用 Perl 5 模块 Data::Printer 的 `show_tied` 选项时,如何关闭它?

    我使用了带有 Perl 的 CPAN Perl 模块 Data Printer DP 效果很好 现在我想在 Raku 代码中使用它 When I use the from
  • Kivy:标签文本在 for 循环期间不会更新

    当我尝试在 for 循环期间更新标签文本时遇到问题 还有类似的条目 例如 运行代码时更新 kivy 小部件的属性 但它们似乎并不完全符合我的问题 或者我错过了重点 我运行以下代码 py from kivy app import App fr
  • C# 创建/修改/读取 .xlsx 文件

    我正在寻找一种在 C 中创建 修改 读取 xlsx 文件的方法 而无需安装 Excel 或在服务器上创建文件 然后再提供给用户下载 我找到了NPOIhttp npoi codeplex com 看起来不错 但支持 xls 而不是 xlsx
  • 32 位字的镜像位

    你会如何在 C 中做到这一点 例如 如果我们必须镜像 8 位 则 10110001 会变为 10001101 某些处理器上是否有任何指令可以简化此任务 它实际上被称为 位反转 通常在 FFT 加扰中完成 O log N 方式是 最多 32
  • perl6 可以在匹配中使用连接吗?

    是否可以使用 junction 来匹配 junction 中的任何值 我想匹配数组中的任何值 正确的做法是什么 lisprog perl6 To exit type exit or D gt my a a a b c gt any a an
  • 检查项目是否已在上下文菜单中[重复]

    这个问题在这里已经有答案了 不久前 我可以在 Google Chrome 扩展中保存上下文菜单中创建的项目的 ID 背景 js var myItem if myItem MyItem myItem chrome contextMenus c
  • 使用 CRON 作业访问 url?

    我有一个网络应用程序 必须执行重复的任务 发送消息和警报 我已经使用脚本页面在浏览器中加载时执行这些任务 即http example com tasks php我通过 iframe 将其包含在我的 Web 应用程序的每个页面中 现在我想改变
  • Android 请求多个权限

    我正在修改现有的面部跟踪器应用程序 Android 的面部识别示例项目 我在请求多个永久权限时遇到问题 下面的方法是现有方法的修改版本 成功创建了一个弹出窗口来请求相机权限 我正在尝试使用存储权限来复制此内容 但到目前为止我还没有成功 并且
  • 如何通过API获取维基百科文章的一小段文字和主图?

    我正在尝试创建一个简单的维基百科克隆 允许用户搜索某个主题 然后显示 10 个包含文章图像和一小段文本的结果 我已经能够将用户提供的搜索字段传递给我的 ajax 打电话没有问题 但现在我无法检索图像 我已经阅读了 StackOverflow
  • 将线程分配给特定的CPU核心

    AFAIK 在 Linux 中可以将线程分配给 CPU 核心 看this 但是 我的问题是我可以使用以下方法实现此功能吗boost如果可能的话 又是如何实现的呢 请注意 操作系统也不能决定哪个对我来说更好 而是假设我想在设计中控制这种行为
  • Java 通过谓词将流拆分为流的流

    我正在阅读数百个大型 6GB gzip 日志文件GZIPInputStream是我想解析的 假设每一项的格式如下 Start of log entry 1 some log details some log details some log