Flink CsvTableSource 流式传输

2023-12-20

我想使用 flink 流式传输 csv 文件并执行 sql 操作。但我编写的代码只读取一次并停止。它不流式传输。提前致谢,

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

StreamTableEnvironment tableEnv = StreamTableEnvironment.getTableEnvironment(env);

CsvTableSource csvtable = CsvTableSource.builder()
    .path("D:/employee.csv")
    .ignoreFirstLine()
    .fieldDelimiter(",")
    .field("id", Types.INT())
    .field("name", Types.STRING())
    .field("designation", Types.STRING())
    .field("age", Types.INT())
    .field("location", Types.STRING())
    .build();

tableEnv.registerTableSource("employee", csvtable);

Table table = tableEnv.scan("employee").where("name='jay'").select("id,name,location");
//Table table1 = tableEnv.scan("employee").where("age > 23").select("id,name,age,location");

DataStream<Row> stream = tableEnv.toAppendStream(table, Row.class);

//DataStream<Row> stream1 = tableEnv.toAppendStream(table1, Row.class);

stream.print();
//stream1.print();

env.execute();

The CsvTableSource是基于一个FileInputFormat它逐行读取并解析引用的文件。结果行被转发到流式查询中。所以在CsvTableSource流式传输是指连续读取和转发行。但是,那CsvTableSource在文件末尾终止。因此,它发出有界流。

我假设您期望的行为是CsvTableSource读取文件直到其结尾,然后等待向文件追加写入。 然而,这并不是如何CsvTableSource作品。您需要实施自定义TableSource为了那个原因。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Flink CsvTableSource 流式传输 的相关文章

随机推荐

  • gridview 中的分页

    我的网格视图
  • 动态缩略图/调整图像大小生成[重复]

    这个问题在这里已经有答案了 可能的重复 PHP 动态调整图像大小与存储调整大小的图像 https stackoverflow com questions 2823600 php image resize on the fly vs stor
  • 未找到 glib.h 和 gtk.h

    大家好 我有一个程序 其中包括 gtk gtk h glib h 我使用过以下命令 sudo apt get install libgtk2 0 dev glib sudo apt get install glade 但我仍然收到 glib
  • 本机 xml 数据库中的唯一性插入

    我正在开发一个基于XML 的项目 我使用 Sedna 数据库来存储我的集合 其中包含 XML 文件及其 XSD 架构文件 我在这些方案中定义了主键 唯一键 但到目前为止我可以将重复值 通过 XQuery 更新插入命令 插入到主键字段中 为了
  • 内存泄漏和处置

    我可能不理解这个概念或者我做错了什么 我对 NET 中的内存管理有一些疑问 想象一下情况 Form1是大人窗体 作为 MDI 父级和一点FormChild 被绑定为子项 public partial class Form1 Form pub
  • 如何使用 @output 将布尔值从子级发送到父级

    嗨 有角的社区 我想使用 Output 触发一个事件来隐藏或打开 关闭包含其他组件的 div 这很简单 但我以前从未使用过 EventEmitter 所以我希望当调用 hideDem 时 它会根据来自 child ts 的其他属性隐藏或打开
  • 如何增加android中线性布局中元素之间的空间? [复制]

    这个问题在这里已经有答案了 在我的应用程序中 我使用了 Linearlayout 其中有 3 个 EditText 元素 现在我想增加 Edittext 元素之间的空间 填充 是否可以 基本上这是相对于你的边框而言的 如果你想在边框内的元素
  • 使用该 exe 的 VM 参数在 Maven 中创建一个 exe 文件

    我目前正在开发一个 Java 项目 该项目需要指定 java library path 的 VM 参数 当从 Eclipse 中运行我的程序时 没问题 我可以轻松指定我的 VM 参数 现在我想通过 Maven 构建我的项目 但还没有找到向该
  • 如何将二进制字符串写入文件 C#

    我有一个二进制数字符串 例如 temp 0101110011 我想将其另存为文件 该 Temp 有 10 个字符 如何将此字符串保存到 10 位长度的文件中 void Save Data string temp bool BoolArray
  • 使用 NSXMLParserDelegate 构建项目时出错

    TurbineXMLParser h import
  • VBA - 文件夹选择器 - 设置从哪里开始[重复]

    这个问题在这里已经有答案了 我有一个小型 Access VBA 应用程序 需要用户选择一个文件夹 我想知道是否有办法告诉 VBA 启动文件夹选择器的路径 即启动文件夹选择器C data forms 目前看来是从以前使用的目录开始的 还有一种
  • WebDriver / 将元素读入变量并重新使用它们

    我有一个大问题Webdriver 硒2 在我的测试代码中 我找到测试开始时的所有元素 并对它们执行一些操作 例如click 检查属性等 我的问题是我的页面刷新并重新加载我的元素 而 Webdriver 不知道再次识别这些元素 我知道我可以再
  • 即使调整窗口大小,也保持背景图像居中

    我有一个容器 div 另一个 div 居中 里面有背景图像 当我调整浏览器窗口大小时 我希望图像保持居中 即使浏览器窗口的宽度为smaller比图像的宽度 这是一些代码 CSS wrap width 100 height 357px bac
  • Git 推送在 TOTAL 上挂起

    Git 突然开始挂起PUSH命令 我 搜索了其他问题 但解决方案不起作用 我使用的是 ubuntu 12 04 Counting objects 18 done Delta compression using up to 2 threads
  • 我可以将 Snapchat SDK (SnapKit) 与 SwiftUI 结合使用吗?

    我正在尝试整合Snapkit https docs snapchat com 使用 iOS 应用程序 但我想使用 SwiftUI 而不是 UIKit 我已经使用 Snapkit 完成了所需的设置 现在我正在尝试让 Snapchat 登录按钮
  • R:数据点与高斯函数的稳健拟合

    我需要进行一些稳健的数据拟合操作 I have bunch of x y data that I want to fit to a Gaussian http en wikipedia org wiki Gaussian function
  • 在 init 中引发异常时如何防止泄漏?

    情况是这样的 假设我有一个名为 MYFoo 的类 这是它的初始化程序 init self super init if self during initialization something goes wrong and an except
  • Android 键盘消失时白色背景

    来自不同用户的问题视频 但内容相同 https i stack imgur com 2Jzov jpg https i stack imgur com 2Jzov jpg 我的背景图像设置如下 pane background image u
  • 您是否将存储库注入域对象中?

    经过一周每天 10 多个小时学习领域驱动设计后 我开始感觉自己已经开始很好地理解它了 直到今天读到这篇文章 http blog fedecarg com 2009 03 15 domain driven design the reposit
  • Flink CsvTableSource 流式传输

    我想使用 flink 流式传输 csv 文件并执行 sql 操作 但我编写的代码只读取一次并停止 它不流式传输 提前致谢 StreamExecutionEnvironment env StreamExecutionEnvironment g