StreamingFileSink 未将数据提取到 s3

2024-02-22

我创建了简单的摄取服务,该服务选择本地文件并使用 StreamingFileSink 摄取到 s3。

https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html

我已经按照文档设置了所有内容,但它不起作用。我测试了接收器位置到另一个本地本地路径,文件正在到达那里(但隐藏为 .part 文件)

这是否意味着零件文件也会发送到 s3 但不可见?

...

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            String path = "/tmp/component_test";

            MyFileInputFormat myFileInputFormat = new MyFileInputFormat(new Path(path));
            myFileInputFormat.setNumSplits(1);

            ContinuousFileMonitoringFunction<String> monitoringFunction =
                    new ContinuousFileMonitoringFunction<>(myFileInputFormat,
                            FileProcessingMode.PROCESS_CONTINUOUSLY,
                            env.getParallelism(), 1000);


            // the monitor has always DOP 1
            DataStream<TimestampedFileInputSplit> splits = env.addSource(monitoringFunction);

            ContinuousFileReaderOperator<String> reader = new ContinuousFileReaderOperator<>(myFileInputFormat);
            TypeInformation<String> typeInfo = new SimpleStringSchema().getProducedType();

            // the readers can be multiple
            DataStream<String> content = splits.transform("FileSplitReader", typeInfo, reader);

            SingleOutputStreamOperator<Tuple2<String, String>> ds = content.flatMap(
                    new XMLSplitter());


            //new Path("s3://<bucket_name>/raw/")
            //new Path("file:///tmp/raw/")
            StreamingFileSink<Tuple2<String, String>> sink = StreamingFileSink
                    .forRowFormat(new Path("s3a://<bucket-name>/raw/"),
                            (Tuple2<String, String> element, OutputStream stream) -> {
                                PrintStream out = new PrintStream(stream);
                                out.println(element.f1);
                            })
                    // Determine component type for each record
                    .withBucketAssigner(new ComponentBucketAssigner())
                    .withRollingPolicy(DefaultRollingPolicy.create().withMaxPartSize(100).withRolloverInterval(1000).build())
                    .withBucketCheckInterval(100)
                    .build();
            ds.addSink(sink);       
            FileSystem.initialize(GlobalConfiguration.loadConfiguration(System.getenv("FLINK_CONF_DIR")));
            env.execute();
...

我正在寻找 s3 中的零件文件,或者我是否需要对 StreamingFileSink 进行任何更改以滚动最小大小的零件文件?

...

09:37:39,387 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 1 for job 34d46d2671c996d6150d88a2f74b4218 (7558 bytes in 38 ms).
09:37:39,388 INFO  org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0 received completion notification for checkpoint with id=1.
09:37:39,389 INFO  org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 1 received completion notification for checkpoint with id=1.
09:37:39,390 INFO  org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 2 received completion notification for checkpoint with id=1.
09:37:39,391 INFO  org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 3 received completion notification for checkpoint with id=1.
09:37:39,391 INFO  org.apache.flink.fs.s3.common.writer.S3Committer              - Committing <BUCKET NAME>/<FOLDER1>/part-1-0 with MPU ID CEYMmUslgCnA2KcD5pslz.7dpaQuCAqmTJo6oDPv7P.Rj45O4tHrVTfDQMABxrRvdWSTwO2RoIR.r9VP2s4IMxlPtHz9r6CP_iQ7.DcP9yGDLjIN1gaLPTunAhVGuGen
09:37:39,391 INFO  org.apache.flink.fs.s3.common.writer.S3Committer              - Committing <BUCKET NAME>/<FOLDER2>/part-0-0 with MPU ID ExM_.cfOZVvXHHGNakUeshSQrkLFtm3HytooPAxDet1MoXBEJYhxlEJBYyXFmeSpk7b.ElmoydrMgotnpZAgmsh6lGhQgMYoS2hFJtOZLtPCOLyJvOt3TKRecc8YqSAJ
09:37:39,391 INFO  org.apache.flink.fs.s3.common.writer.S3Committer              - Committing <BUCKET NAME>/<FOLDER3>/part-2-0 with MPU ID 64._ocicEwPAwrMrI_LXcKyEfqYtISKsLsheAjgXwGdpf3qTH0qvOM2C3k8s2L6UDJ8yZfm9YEJhopgQIrL0hmFokCyMa49bzUbhgm3KQmiCVe9CoNiTEb4ETnEJCZFA
09:37:39,393 INFO  org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 4 received completion notification for checkpoint with id=1.
09:37:39,394 INFO  org.apache.flink.fs.s3.common.writer.S3Committer              - Committing <BUCKET NAME>/<FOLDER4>/part-3-0 with MPU ID yuFGGVfh9YOL36mUUTIAyyLehCMyQGrYoabdv0BBe.e3uCIkLYLI6S4RfnCGtFsT2pjiEJq97bfftMycp4wGW5KKX4jsrmZAfK.kqiYnMUeWWcolXKmWOktVvwHvmSpB
09:37:39,394 INFO  org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 5 received completion notification for checkpoint with id=1.
09:37:39,395 INFO  org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 6 received completion notification for checkpoint with id=1.
09:37:39,394 INFO  org.apache.flink.fs.s3.common.writer.S3Committer              - Committing <BUCKET NAME>/<FOLDER5>/part-4-0 with MPU ID Ab7sTpLJp3fNCCYVXe2nUO5qWmYxMeYQlOssRpeawoY2LDV.a58eShdp.Anfe6YxTnVIewCmReKiYSguJS2SlBxwNRPh2ax50nCXuSdfkyVazgiNMZYMUQJjbzTxgdYW
09:37:39,395 INFO  org.apache.flink.fs.s3.common.writer.S3Committer              - Committing <BUCKET NAME>/<FOLDER6>/part-5-0 with MPU ID xDbouvLhpX7q9rFrs9y93lc7wWO20L5mxKTCWFBAmAVkTWzEiGEu2bU5H2nnCrZWbcPDMePSdpOBK64lVoS8txuhLFtq_nkBfXIs2K6OY6NuTtiSDGWi4SrWwnedC6RM
09:37:39,395 INFO  org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 7 received completion notification for checkpoint with id=1.
09:37:39,397 INFO  org.apache.flink.fs.s3.common.writer.S3Committer              - Committing <BUCKET NAME>/<FOLDER7>/part-6-0 with MPU ID 0uZ35XrL2ShWxZL5nlY3Z1KHTSHBsQhiaJ6HZ9CbzfgxFIf7bwRNjdGHQHWPs9N0WfcpQXBM12XbNENjfILXQ6CLCx0XZrgvGHakUgeWhfeBiOURrO8xUVMT1ot7gxIY

...


StreamingFileSink 仅在启用检查点时才起作用。零件文件作为检查点过程的一部分最终确定。

该文档最近已更新以解释这一点,但目前仅在文档的夜间构建中可用:https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/streamfile_sink.html https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/streamfile_sink.html

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

StreamingFileSink 未将数据提取到 s3 的相关文章

  • Java Arraylist of object 按日期从数组列表中删除元素

    这是我的数组列表 ArrayList
  • 面试问题 - 在排序数组 X 中搜索索引 i,使得 X[i] = i

    昨天面试时 我被问到了以下问题 考虑一个 Java 或 C 数组X它已排序并且其中没有两个元素是相同的 如何最好地找到索引i这样该索引处的元素也是i 那是X i i 作为澄清 她还给了我一个例子 Array X 3 1 0 3 5 7 in
  • JSP重定向和传值

    我有一个 JSP 其中我重定向到另一个 jsp 例如 我在该jsp中没有任何其他数据 我想将值从该jsp index jsp 传递到重定向jsp login jsp 我将如何做到这一点 这里的 logonInput 是在struts con
  • 用户“root”@“localhost”的访问被拒绝

    我正在尝试从数据库中获取记录 但我面临这个访问被拒绝的问题 我尝试了 Stack Overflow 上提到的其他解决方案 例如向用户授予权限 但没有任何效果 访问数据库的代码 public void service HttpServletR
  • java.sql.SQLException: - ORA-01000: 超出最大打开游标数

    我收到 ORA 01000 SQL 异常 所以我有一些与之相关的疑问 最大打开游标是否与 JDBC 连接数完全相关 或者它们也与我们为单个连接创建的语句和结果集对象相关吗 我们正在使用连接池 有没有办法配置数据库中语句 结果集对象的数量 如
  • 在 Android 中使用 lambdaj

    有人尝试过在android开发中使用lambdaj库吗 当我创建一个简单的小型java应用程序时 它对我来说工作得很好 但我无法在android应用程序中使用它 UPDATE 我正在添加 lambdaj lambdaj 2 3 2 with
  • 如何提高 Guice 启动时的性能

    好吧 我知道我的计算不客观等等 但无论如何 我讨厌在执行单元测试时等待这么多时间 我的 guice swing 应用程序需要大约 7 秒来初始化 这是一个简单的 IRC 客户端 在那一刻 没有打开连接 我什至还没有调用任何 java io
  • 按位非运算符

    为什么要按位运算 0 打印 1 在二进制中 不是0应该是1 为什么 你实际上很接近 在二进制中 不是0应该是1 是的 当我们谈论一位时 这是绝对正确的 然而 一个int其值为0的实际上是32位全零 将所有 32 个 0 反转为 32 个 1
  • 在 Java 和 PHP 之间加密/解密字符串

    我使用 AES 加密来加密和解密服务器端的 php 和 Android 应用程序 作为客户端 之间的字符串 PHP 中的加密字符串为 HaxRKnMxT24kCJWUXaVvqDHahzurJQK sYA4lIHql U 在 Java 中是
  • 让 Java 与 Windows 10 Ubuntu 一起使用

    我安装了 Windows 10 周年更新 以便可以在 Windows 上的 Ubuntu 上尝试 Bash 看如何安装 http www howtogeek com 249966 how to install and use the lin
  • 从外部 clojar 导入/使用资源

    我想做的是将一个大文件 MIDI 声音字体 打包到一个独立的 Maven repo clojar 中 然后能够以编程方式将其拉下来并从单独的项目中使用它 事实证明 这个看似简单的任务比我想象的要复杂 理想的情况是 如果有一种方法可以直接访问
  • 如何在最短的时间内克隆java中的输入流

    有人可以告诉我如何克隆输入流 并花费尽可能少的创建时间吗 我需要多次克隆输入流以使用多种方法来处理 IS 我尝试了三种方法 但由于这样或那样的原因 事情不起作用 方法 1 感谢 stackoverflow 社区 我发现以下链接很有帮助 并将
  • GAE - Eclipse 中的开发服务器未更新?

    我在 Eclipse 上使用 Google AppEngine 开发服务器 我的本地网页似乎没有更新 直到我在开发服务器上进行了多次重新启动 使用 Eclipse 中的 运行 或 调试 按钮 我究竟做错了什么 基本流程是 更改 java 文
  • 对于每个抛出异常的语句,try/catch 是否被视为反模式?

    我目前正在审查同事的 Java 代码 我看到很多情况下 每个可能抛出异常的语句都被封装在自己的 try catch 中 其中 catch 块都执行相同的操作 哪个操作与我的问题无关 对我来说 这似乎是一种代码味道 我记得读到过它是一种常见的
  • 如何将我的自定义相机应用程序设置为默认应用程序?

    如果我使用以下代码 Intent takePictureIntent new Intent MediaStore ACTION IMAGE CAPTURE startActivityForResult takePictureIntent 1
  • 当相应的 JTextfield 为空时,如何填充 JTable 中的所有项目

    我正在 Java 项目中设计一个高级搜索选项sqlite在 NetBeans 中 有5种不同JTextfields和 5 列 我想填充JTable具有相应的匹配标准 如果一个JTextfield为空 那么它应该选择该列的所有项目 我使用的查
  • java.lang.NoSuchMethodError:com.fasterxml.jackson.databind.type。使用 apache beam Spark runner 运行 go 示例时

    我想跑grades https github com apache beam tree master sdks go examples gradesapache beam go sdk 提出的示例 在一个主服务器和两个从服务器 spark2
  • 如何实现再次播放功能?

    我希望在游戏结束时得到提示 如果我还想再玩一次的话 并使用 Y N 输入 退出游戏或重复游戏 我该如何以最有效的方式解决这个问题 编辑 描述资源路径位置类型 类型 Main Main java ScaredyCat src se grupp
  • java.lang.NoClassDefFoundError:com.google.ads.AdView

    我正在尝试将 admob 广告合并到我的应用程序中 到目前为止我已经添加了以下代码 在我的应用程序主要活动的 onCreate 方法中 adView new AdView this AdSize BANNER my code number
  • 文件构造函数说明

    我无法理解以下文件构造函数 public File String parent String child and public File File parent String child 参数有什么作用parent and child该文件

随机推荐

  • 如何在 Espresso 中的 RecyclerView 内部断言?

    我正在使用 espresso contrib 来执行操作RecyclerView 并且它应该正常工作 例如 click on first item onView withId R id recycler view perform Recyc
  • C# 生产质量线程安全内存中 LRU 缓存是否过期?

    这也许就像求棒上的月亮一样 但是是否有 C 生产质量的线程安全内存中 LRU 缓存 带过期 或者有人有最佳实践想法来实现同样的事情吗 LRU 是 最近最少使用 http en wikipedia org wiki Cache algorit
  • ListView 中每个项目的单独首选项?

    我正在创建我的第一个 Android 应用程序 或者无论如何都在尝试 但我有一个问题 我似乎找不到答案 我想允许用户为列表视图中的每个项目输入一组单独的首选项 我的 PreferenceScreen 正在工作 但它为每个项目维护相同的首选项
  • 为什么 scanf 中需要 %hd ?

    我创建了一个非常简单的程序 带有菜单 取一个值 然后将其记忆到 局部变量值 最后与 第二个选项是程序打印该值 我的问题是 为什么只有添加 h 程序才能运行 到 scanf 参数 换句话说 存在什么样的关系 scanf 和我的本地 int 值
  • Sprite Kit:大量带有 Bit Blitting 的 sprite(1000+)

    我正在尝试使用 SpriteKit 创建一个场景 其中包含数千个精灵 500 2000 每个精灵只是一个 1x1 的白色像素 甚至不需要为它们使用纹理 立即将这么多精灵直接添加到场景中是不可能的 或者至少我这么认为 在 iPhone 6 上
  • 玩法框架2:在route中使用Array[String]

    我想生成一个像这样的网址 照片 标签 标签1 标签2 标签3 路线文件 GET photo controllers Photos list tags Array String 我在播放控制台中收到此错误 找不到 Array String 类
  • 尝试使用 ServiceController 时出现错误 MSB4062

    我在 x64 计算机上使用 Visual Studio 2010 和 TFS 2010 我正在尝试在我的构建中使用 MSBuild 社区任务目标 该目标存在于源代码管理中 因此 在我的 csproj 文件中 我导入该特定目标 但现在出现以下
  • 在项目的层次结构中找不到新元素

    我正在尝试为 Xamarin 应用程序创建一个简单的页面 但完全无法继续执行最基本的步骤 我从项目存储库中检查分支 并尝试添加新文件 当我这样做时 右键单击文件夹 添加 gt 新项目 gt 内容页面 我收到此错误 它实际上创建了 xaml
  • 在使用像数组这样的变量之前分配 array()

    我试图找到一个合适的解释性标题 但我找不到 我将尝试解释我在这里问的问题 通常 如果您不将空数组分配给变量 则可以开始为索引分配值 如下所示 hello world Hello World echo hello world 但我总是遇到这样
  • 从工作线程使用 Flask SQLAlchemy

    我有一个 python 应用程序使用烧瓶宁静 http flask restful cn readthedocs io en 0 3 5也Flask SQLAlchemy http flask sqlalchemy pocoo org 2
  • Bootstrap 中缺少可见-** 和隐藏-**

    在Bootstrap v3中我经常使用hidden 类结合clearfix来控制不同屏幕宽度下的多列布局 例如 我可以在一个 DIV 中组合多个 hide 以使我的多列在不同的屏幕宽度下正确显示 举个例子 如果我想显示多行产品照片 在较大的
  • Common Lisp:如何使用条件拼接在宏中构建列表?

    我们假设 defmacro testing optional var list this is when consp var a list 当被调用时 gt testing 2 THIS IS gt testing list 1 2 THI
  • 从 asp.net/C# 查询 MUMPS

    有谁知道如何使用 C 从 MUMPS 数据库查询而不使用 KBSQL ODBC 我们需要从 MUMPS 数据库 Mckesson STAR Patient care 进行查询 当我们使用 KBSQL 时 它仅限于 6 个并发用户 因此我们尝
  • 如何回收/重用 CUDA 线程

    在 CUDA 中 如何为内核中的所有线程创建一个等待的屏障 直到CPU向该障碍发送一个信号 表明继续进行是安全的 有帮助的 我想避免启动 CUDA 内核的开销 有两种类型的开销需要避免 1 在 X 块和 Y 线程上简单启动内核的成本 以及
  • 如何在角度4中以动态形式设置ngIf动态条件

    我正在创建一个动态表单 在其中根据 JSON 的响应动态填充字段 Eg type text required true minlength 3 maxlength 5 name fname visibility true type text
  • 有没有办法使用 HashWithIn DifferentAccess 序列化 ActiveRecord 的 JSON 属性?

    我在用ActiveRecord ConnectionAdapters PostgreSQLAdapter在 Rails 应用程序中 假设我有一个架构 create table foo id bigserial force cascade d
  • 如果未发送,请重试发送邮件

    我正在使用 nodemailer 通过我的节点应用程序发送电子邮件 有时电子邮件不起作用并抛出错误 直到我尝试两次或三次 我希望我的程序一次又一次地尝试 直到邮件成功发送 这是我的代码 const mailOptions from from
  • Django 锁定身份验证 - 登录表单不起作用

    我一直在研究锁定整个页面的方法 一位同事引起了我的注意Django 锁定 https bitbucket org carljm django lockdown src 我已经安装了 我的代码如下所示 INSTALLED APPS lockd
  • jQuery 如何获取元素的边距和填充?

    只是想知道 如何使用 jQuery 我可以获得格式化的总填充和边距等元素 即 30px 30px 30px 30px 或 30px 5px 15px 30px 等 I tried var margT jQuery img css margi
  • StreamingFileSink 未将数据提取到 s3

    我创建了简单的摄取服务 该服务选择本地文件并使用 StreamingFileSink 摄取到 s3 https ci apache org projects flink flink docs stable dev connectors st