FlinkTable时间属性

2023-05-16

像窗口(在 Table API 和 SQL )这种基于时间的操作,需要有时间信息。因此,Table API 中的表就需要提供逻辑时间属性来表示时间,以及支持时间相关的操作。

一、处理时间

1. DataStream 到 Table 转换时定义

处理时间属性可以在schema定义的时候用.proctime后缀来定义。(处理)时间属性一定不能定义在一个已有字段上,所以它只能定义在schema定义的最后。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(1);

DataStreamSource<WaterSensor> waterSensorStream =

env.fromElements(new WaterSensor("sensor_1", 1000L, 10),

new WaterSensor("sensor_1", 2000L, 20),

new WaterSensor("sensor_2", 3000L, 30),

new WaterSensor("sensor_1", 4000L, 40),

new WaterSensor("sensor_1", 5000L, 50),

new WaterSensor("sensor_2", 6000L, 60));

// 1. 创建表的执行环境

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

// 声明一个额外的字段来作为处理时间字段

Table sensorTable = tableEnv.fromDataStream(waterSensorStream, $("id"), $("ts"), $("vc"), $("pt").proctime());

sensorTable.execute().print();

2. 在创建表的 DDL 中定义

package com.bigdata.flink.java.chapter_11;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.table.api.TableResult;

import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class Flink06_TableApi_ProcessTime {

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(1);

// 1. 创建表的执行环境

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

// 创建表, 声明一个额外的列作为处理时间

tableEnv.executeSql("create table sensor(id string,ts bigint,vc int,pt_time as PROCTIME()) with("

+ "'connector' = 'filesystem',"

+ "'path' = 'input/sensor.txt',"

+ "'format' = 'csv'"

+ ")");

TableResult result = tableEnv.executeSql("select * from sensor");

result.print();

}

}

 

二、事件时间

事件时间允许程序按照数据中包含的时间来处理,这样可以在有乱序或者晚到的数据的情况下产生一致的处理结果。它可以保证从外部存储读取数据后产生可以复现(replayable)的结果。

除此之外,事件时间可以让程序在流式和批式作业中使用同样的语法。在流式程序中的事件时间属性,在批式程序中就是一个正常的时间字段。

为了能够处理乱序的事件,并且区分正常到达和晚到的事件,Flink 需要从事件中获取事件时间并且产生 watermark(watermarks)。

1. DataStream 到 Table 转换时定义

事件时间属性可以用.rowtime后缀在定义DataStream schema 的时候来定义。时间戳和watermark在这之前一定是在DataStream上已经定义好了。

在从DataStream到Table转换时定义事件时间属性有两种方式。取决于用 .rowtime 后缀修饰的字段名字是否是已有字段,事件时间字段可以是:

在 schema 的结尾追加一个新的字段。

替换一个已经存在的字段。

不管在哪种情况下,事件时间字段都表示DataStream中定义的事件的时间戳。

package com.bigdata.flink.java.chapter_11;

import com.bigdata.flink.java.chapter_5.WaterSensor;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;

import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.table.api.Table;

import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.time.Duration;

import static org.apache.flink.table.api.Expressions.$;

public class Flink07_TableApi_EventTime {

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(1);

SingleOutputStreamOperator<WaterSensor> waterSensorStream = env

.fromElements(new WaterSensor("sensor_1", 1000L, 10),

new WaterSensor("sensor_1", 2000L, 20),

new WaterSensor("sensor_2", 3000L, 30),

new WaterSensor("sensor_1", 4000L, 40),

new WaterSensor("sensor_1", 5000L, 50),

new WaterSensor("sensor_2", 6000L, 60))

.assignTimestampsAndWatermarks(

WatermarkStrategy

.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(5))

.withTimestampAssigner((element, recordTimestamp) -> element.getTs())

);

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

Table table = tableEnv

// 用一个额外的字段作为事件时间属性

.fromDataStream(waterSensorStream, $("id"), $("ts"), $("vc"), $("et").rowtime());

table.execute().print();

env.execute();

}

}

// 使用已有的字段作为时间属性

.fromDataStream(waterSensorStream, $("id"), $("ts").rowtime(), $("vc"));;

2. 在创建表的 DDL 中定义

事件时间属性可以用 WATERMARK 语句在 CREATE TABLE DDL 中进行定义。WATERMARK 语句在一个已有字段上定义一个 watermark 生成表达式,同时标记这个已有字段为时间属性字段。

package com.bigdata.flink.java.chapter_11;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class Flink07_TableApi_EventTime_2 {

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(1);

StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

// 作为事件时间的字段必须是 timestamp(3) 类型, 所以根据 long 类型的 ts 计算出来一个 t

tEnv.executeSql("create table sensor(" +

"id string," +

"ts bigint," +

"vc int, " +

"t as to_timestamp(from_unixtime(ts/1000,'yyyy-MM-dd HH:mm:ss'))," +

"watermark for t as t - interval '5' second)" +

"with("

+ "'connector' = 'filesystem',"

+ "'path' = 'input/sensor.txt',"

+ "'format' = 'csv'"

+ ")");

tEnv.sqlQuery("select * from sensor").execute().print();

}

}

说明:

1.把一个现有的列定义为一个为表标记事件时间的属性。该列的类型必须为 TIMESTAMP(3),且是 schema 中的顶层列,它也可以是一个计算列。

2.严格递增时间戳:WATERMARK FOR rowtime_column AS rowtime_column。

3.递增时间戳:WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '0.001' SECOND。

4.乱序时间戳:WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL 'string' timeUnit。

5.当发现时区所导致的时间问题时,可设置本地使用的时区:

Configuration configuration = tableEnv.getConfig().getConfiguration();

configuration.setString("table.local-time-zone", "GMT");

总结

Flink处理时间属性可以在schema定义的时候用.proctime后缀来定义。(处理)时间属性一定不能定义在一个已有字段上,所以它只能定义在schema定义的最后。Flink事件时间属性可以用.rowtime后缀在定义DataStream schema 的时候来定义。时间戳和watermark在这之前一定是在DataStream上已经定义好了。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

FlinkTable时间属性 的相关文章

  • QML中使用QPen设置组件边框(以CandlestickSeries为例)

    QML中提供了很多组件是由C 43 43 中绘制而注册到QML中使用的 xff0c 这些组件采用QPen绘制组件边框 xff0c 这样的组件均可使用返回QPen的方法设置边框样式 例如CandlestickSeries就提供了数据展示的蜡烛
  • Qt QML 自绘GPS方位校北仪控件

    校北仪用于显示不同设备与参照方位之间的误差夹角 xff0c 如果仅仅使用柱状图显示多个不同设备误差的数值 xff0c 数据不够直观表示 xff0c 因此自己画一个 xff0c 效果如图 xff1a 该控件使用QQuickPaintedIte
  • QML 地图修改插件源码(三),Map在Plugin中设置加载地图类型

    常用的地图种类分为交通图 xff0c 地形图 xff0c 卫星图等等 xff0c 在QML的Map xff08 以OSM地图为例 xff09 中提供activeMapType属性用于读取当前显示的地图类型 xff08 注意 xff1a 该属
  • QML 地图修改插件源码(四),Map根据目录作为索引加载地图瓦片

    QML中的地图 以OSM为例 在使用过程中会发现当地图层数很多时 xff0c 特别是如果使用离线地图且地图层级较多时地图会变得很卡 xff08 在线地图加载的层级数多且不清除缓存时也会卡 xff09 xff0c 原因在于QML地图插件对地图
  • Qt Quick 3D中将3D场景(如.obj)转换为.mesh

    Qt 5 15以后提供了Qt Quick 3D的新模块 xff0c 该模块可使用可视化的方式即可进行3D模型的显示 xff0c 并且相较于Qt 3D来说 xff0c 该模块的API更为高级 xff0c 使用者不需要自己设计底层的细节 该模块
  • Qt Quick 3D系列(一):加载3d模型

    如果我们想在QML中使用3D且你之前没有三维程序开发的基础 xff0c 使用Qt Quick 3D是个不错的选择 xff0c 下面我介绍如何使用Qt Quick 3D加载3d模型 注意 xff1a Qt Quick 3D从Qt 5 15之后
  • boa常见错误整理

    错误1 xff1a bison y d boa grammar y gcc g O2 pipe Wall I c o y tab o y tab c y tab c In function yyparse y tab c 1295 warn
  • Qt Quick 3D系列(二):鼠标控制3D模型旋转缩放

    上一篇文章Qt Quick 3D系列 xff08 一 xff09 xff1a 加载3d模型我们讲述了如何在Qt Quick 3D中显示一个3D模型 xff0c 那么显示了3D模型后如何使用鼠标进行旋转呢 xff1f 在Qt 3D中提供了Or
  • Qt Quick 3D系列(三):设置三维模型的金属光泽材质

    前面的博客中介绍了如何在Qt Quick 3D中加载三维模型 xff0c 下面介绍如何设置三维模型的材质 xff0c 例如下图模型 我需要设置为金属材质时 xff0c 设置该Model的materials为PrincipledMateria
  • Qt Quick 3D系列(四):为三维模型添加动画效果

    Qt Quick 3D旨在让那些熟悉QML开发的人能够使用非常少的学习成本实现3D开发 xff0c 因此对于三维模型动画没有类似与Qt 3D中有个专门的Qt3DAnimation来控制动画效果 xff0c 而是直接使用QML中的Animat
  • Qt Quick 3D系列(五):三维模型展示示例

    为了展示一个好看的3D模型 xff0c 需要对模型设置背景 xff0c 设置三维材质 xff0c 设置周围光线 xff0c 设置模型阴影等等 下面示例展示了一个比较好看的三维模型效果 xff0c 大家可以先在C4D等调整好模型效果 xff0
  • QML 地图可拖拽位置标签组件

    在地图上显示位置信息时 xff0c 有时候需要同时显示该位置的详细信息 该组件可在地图上显示一个连接到地图地理位置的标签框 xff0c 该标签框可点击进行拖拽 在地理位置改变 地图缩放 地图平移时 xff0c 该标签框的相对位置保持不变 x
  • 使用QQuaternion对Qt Data Visualization中模型进行旋转

    在Data Visualization中 xff0c 三维显示的OBJ需要旋转时使用rotation属性 xff0c 但是该属性传入的值是一个四元数QQuaternion xff0c 直接赋值四元数很复杂 xff0c 因此使用转化的方式获得
  • QML 可拖拽边框和顶点调整大小组件(新增对主窗口支持)

    QML项目开发过程中 xff0c 有时候需要对控件大小和位置 进行人为调整 xff0c 因此设计该组件 该组件鼠标置于边框和顶点位置时鼠标样式对应改变 xff0c 拖动边框可修改该方向组件大小 xff0c 拖动顶点可修改组件处横纵向组件大小
  • QML 地图修改插件源码(五),Map添加自定义地图类型,并动态修改地图类型

    QML的地图Map中提供了属性activeMapType MapType用于设置当前地图的类型 xff0c 以OSM地图插件为例 xff0c OSM地图提供了多种地图类型 xff0c 下面介绍如何修改OSM插件的源码添加需要的地图类型 xf
  • Qt使用QQuaternion对空间矢量QVector3D进行旋转

    空间中的QVector3D既可以代表空间中的点位置 xff0c 也可以表示空间矢量 为什么要对空间矢量进行旋转呢 xff0c 比如有一个空间矢量在空间中代表了镜头前进的方向 xff08 即第一人称模式 xff09 xff0c 初始时该矢量指
  • QML实现双屏显示

    QML程序中需要分别在主屏幕和分屏幕上显示不同的界面内容 xff0c 但又为了不同界面间能够进行数据交互 xff0c 因此使用如下方法实现双屏显示 xff0c 即由主窗口生成第二个窗口 xff0c 将该窗口移动到第二个屏幕上 xff0c 实
  • ubuntu设置默认内核启动的方法

    本文介绍ubuntu设置默认内核启动的方法 参考如下图 xff0c 修改grub文件 xff1a 修改后 xff0c 执行 xff1a sudo update grub amp amp reboot
  • QML自定义的日历控件

    QML中提供了日历的控件Calendar xff0c 但该控件为QtQuick Controls 1中提供的控件 xff0c 因此只能使用QtQuick Controls Styles的方式对该控件进行设置 xff0c 效果如图 xff1a

随机推荐

  • QML地图Map中使用QPainterPath,并显示任意点经纬度位置

    QML地图Map中提供了供绘制图形的组件 xff0c 例如MapPolyline xff0c MapCircle等 xff0c 但是这些组件在绘制复杂轨迹时就显得功能不够全面 xff0c 因此我将QPainterPath在Map中进行使用并
  • QML地图绘制虚线

    QML提供了MapPolyline用于在地图上绘制线段 xff0c 该线段是实线 xff0c 因此我使用Canvas自定义绘制的方式在地图上绘制线段 xff0c 如图 xff1a 鼠标在地图上点击后 xff0c 在点击位置添加图标 xff0
  • QML无边框最大化窗口时遮住了任务栏,程序默认置顶的问题

    在QML窗口使用无边框 xff0c 并且默认程序最大化显示后 xff0c 此时程序显示默认自动变成了全屏显示 xff0c 程序自动遮挡住了系统任务栏 当使用多屏幕显示时 xff0c 切换不同程序 xff0c 该QML程序的界面显示错误 使用
  • 用VScode写C/C++,从下载安装到配置使用

    介绍 编程的过程大致分为编写代码 代码编译 代码执行三步 xff0c vscode可以完成代码编写 xff0c 但是不能进行编译 也就是将代码翻译为计算机可以听懂的话 xff0c MinGW可以完成这个任务 xff0c 二者配合可以实现在v
  • 深度学习训练降低显存指南

    一 小模块API参数inplace设置为True xff08 省一点点 xff09 比如 xff1a Relu 有一个默认参数inplace xff0c 默认设置为False xff0c 当设置为True时 xff0c 计算时的得到的新值不
  • C++工程师学习内容

    C 43 43 是最贴近底层编程语言 在性能方面上 xff0c 有着无可替代的优势特别是对于很多游戏开发公司来说 xff0c C 43 43 尤其适合作为后端服务的开发语言 在一些对于并发性能要求较高的业务上 xff0c C 43 43 也
  • Ubuntu 更新apt出错

    输入sudo apt get update后出现 Err 1 http us archive ubuntu com ubuntu xenial InRelease Temporary failure resolving 39 us arch
  • 使用OpenWrt开发嵌入式Linux(二):先让系统跑起来(使用initramfs)

    安装相关工具 推荐使用ubuntu 16及以上版本 sudo apt install gcc binutils bzip2 flex python perl make diffutils unzip gawk subversion zlib
  • 使用kubeadm从0到1搭建kubernete集群

    目录 概述 安装前提示 安装docker 安装kubeadm 安装kubernete集群master节点 安装 kubeadm kubectl kubelet组件 安装kubernete master节点 安装CNI网络插件 部署集群wor
  • shell基础之变量(2):变量有哪些种类、怎么定义/赋值/取值、不同种类变量的作用域

    通过本文能对shell变量有一个系统性的了解 xff0c 具体的包括 xff1a 变量的种类 xff1a 局部 全局 环境变量变量的定义和操作 xff1a 赋值 取值 取消变量变量的作用域 文章目录 一 变量的种类1 全局变量2 局部变量
  • java 泛型全解 - 绝对最详细

    背景 对于java的泛型我一直属于一知半解的 xff0c 平常真心用的不多 直到阅读 Effect Java 看到很多平常不了解的用法 xff0c 才下定决心 xff0c 需要系统的学习 xff0c 并且记录下来 1 泛型的概述 xff1a
  • Zookeeper数据同步流程

    在服务器启动阶段 xff0c 会进行磁盘数据的恢复 xff0c 完成数据恢复后就会进行Leader选举 一旦选举产生Leader服务器后 xff0c 就立即开始进行集群间的数据同步 xff0c 在整个过程中 xff0c Zookeeper都
  • JS中Ajax的方法和应用

    XMLHttpRequest对象 Ajax技术的核心是XMLHttpRequest对象 xff08 简称XHR xff09 这是有微软率先引入的一个特性 xff0c 其他浏览器提供商后来都提供了相同的实现 但因为IE的兼容性问题 xff0c
  • node.js安装及环境配置

    一 下载nodejs的安装包 xff1a 下载地址 xff1a https nodejs org zh cn download 根据自己电脑系统及位数选择 xff0c 一般都选择windows64位 msi格式安装包 网站上提供的安装包版本
  • 6个常用的React组件库

    Ant Design 项目链接 xff1a Ant Design 包大小 xff08 来自 BundlePhobia xff09 xff1a 缩小后 1 2mB xff0c 缩小 43 gzip 压缩后 349 2kB xff0c 通过摇树
  • 大数据培训课程数据清洗案例实操-简单解析版

    数据清洗 xff08 ETL xff09 在运行核心业务MapReduce程序之前 xff0c 往往要先对数据进行清洗 xff0c 清理掉不符合用户要求的数据 清理的过程往往只需要运行Mapper程序 xff0c 不需要运行Reduce程序
  • 宋红康2023版Java视频发布

    1500万 43 播放量见证经典 xff0c 尚硅谷宋红康老师的Java入门视频堪称神作 xff0c 如今经典再次超级进化 xff0c 新版Java视频教程震撼来袭 xff01 开发环境全新升级 xff1a JDK17 43 IDEA202
  • Java消息队列:消息在什么时候会变成Dead Letter?

    在较为重要的业务队列中 xff0c 确保未被正确消费的消息不被丢弃 xff0c 通过配置死信队列 xff0c 可以让未正确处理的消息暂存到另一个队列中 xff0c 待后续排查清楚问题后 xff0c 编写相应的处理代码来处理死信消息 一 什么
  • Vue2和Vue3数据双向绑定原理的区别及优缺点(下篇)

    上篇我们讲到了Vue2的数据双向绑定原理 xff0c 如果你没有阅读上篇 xff0c 建议先阅读一下上篇中的内容 Vue2和Vue3数据双向绑定原理的区别及优缺点 xff08 上篇 xff09 在上篇中我们抛出了一个问题 xff1a 是不是
  • FlinkTable时间属性

    像窗口 xff08 在 Table API 和 SQL xff09 这种基于时间的操作 xff0c 需要有时间信息 因此 xff0c Table API 中的表就需要提供逻辑时间属性来表示时间 xff0c 以及支持时间相关的操作 一 处理时