Flink从入门到真香(18、使用flink table api 从文件和kafka中读取数据)

2023-11-09

还是一样,要先引入依赖,在pom.xml

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner_2.12</artifactId>
    <version>1.10.1</version>
</dependency>

从文件中读取数据

新建一个 TableApiFromFile的object:

/**
 *
 * @author mafei
 * @date 2020/11/22
 *
 *       把txt内容注册成为表,按照预期的格式输出出来
 */

package com.mafei.apitest.tabletest

import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.{DataTypes, Table}
import org.apache.flink.table.api.scala._
import org.apache.flink.table.descriptors.{Csv, FileSystem, OldCsv, Schema}

object TableApiFromFile {
  def main(args: Array[String]): Unit = {
    //1 、创建环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val tableEnv = StreamTableEnvironment.create(env)
    //2、读取文件
    val filePath = "/opt/java2020_study/maven/flink1/src/main/resources/sensor.txt"
    tableEnv.connect(new FileSystem().path(filePath))
      .withFormat(new Csv()) //因为txt里头是以,分割的跟csv一样,所以可以用oldCsv
      .withSchema(new Schema() //这个表结构要跟你txt中的内容对的上
        .field("id", DataTypes.STRING())
        .field("timestamp", DataTypes.BIGINT())
        .field("temperature", DataTypes.DOUBLE())
      ).createTemporaryTable("inputTable")

    val inputTable: Table = tableEnv.from("inputTable") //就是上面创建的表名
    inputTable.toAppendStream[(String, Long, Double)].print()
    tableEnv.execute("table api test from file")

  }
}

############ 上面的OldCsv已经废弃,如果要用新的方法,需要单独引入一个依赖在pom.xml中添加 #######################

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-csv</artifactId>
    <version>1.10.1</version>
</dependency>

对应代码使用只需要把OldCsv方法改成Csv就可以了

.withFormat(new Csv())

代码结构及运行效果:


从kafka中读取数据

关于kafka的基础知识和安装可以参考: https://blog.51cto.com/mapengfei/1926065

/**
 *
 * @author mafei
 * @date 2020/11/22
 */

package com.mafei.apitest.tabletest

import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.{DataTypes, Table}
import org.apache.flink.table.api.scala._
import org.apache.flink.table.descriptors.{Csv, Kafka, Schema}

object TableApiFromKafka {
  def main(args: Array[String]): Unit = {
    //1 、创建环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val tableEnv = StreamTableEnvironment.create(env)

    //2、从kafka中读取数据
    tableEnv.connect(
      new Kafka()
        .version("0.11")
        .topic("sensor1")
        .startFromLatest()
        .property("zookeeper.connect", "localhost:2181")
        .property("bootstrap.servers", "localhost:9092")
    ).withFormat(new Csv())
      .withSchema(new Schema() //这个表结构要跟你kafka中的内容对的上
        .field("id", DataTypes.STRING())
        .field("timestamp", DataTypes.BIGINT())
        .field("temperature", DataTypes.DOUBLE())
      )
    /**
     * 如果是json也是一样,需要引入单独的flink-json
     * <dependency>
     * <groupId>org.apache.flink</groupId>
     * <artifactId>flink-json</artifactId>
     * <version>1.10.1</version>
     * </dependency>
     */
      .createTemporaryTable("kafkaInputTable")

    val inputTable: Table = tableEnv.from("kafkaInputTable")
    inputTable.toAppendStream[(String,Long,Double)].print()
    env.execute("table api test from kafka! ")
  }
}

启动kafka服务,并开一个producer命令行,写入几条数据进去(注意是要符合预期的格式)
/opt/kafka_2.11-0.10.2.0/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic sensor1

各种转换操作

目标: 从文件中读取数据,分别使用table api 和SQL这2种方式来实现过滤转换格式等操作

/**
 *
 * @author mafei
 * @date 2020/11/22
 */

package com.mafei.apitest.tabletest

import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.{DataTypes, Table}
import org.apache.flink.table.api.scala._
import org.apache.flink.table.descriptors.{Csv, FileSystem, Schema}

object TableApiTransform {
  def main(args: Array[String]): Unit = {
    //1 、创建环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val tableEnv = StreamTableEnvironment.create(env)
    //2、读取文件
    val filePath = "/opt/java2020_study/maven/flink1/src/main/resources/sensor.txt"
    tableEnv.connect(new FileSystem().path(filePath))
      .withFormat(new Csv()) //因为txt里头是以,分割的跟csv一样,所以可以用oldCsv
      .withSchema(new Schema() //这个表结构要跟你txt中的内容对的上
        .field("id", DataTypes.STRING())
        .field("timestamp", DataTypes.BIGINT())
        .field("temperature", DataTypes.DOUBLE())
      ).createTemporaryTable("inputTable")

    val inputTable: Table = tableEnv.from("inputTable") //就是上面创建的表名

    //3、查询转换
    //第一种,使用table api
    val sensorTable = tableEnv.from("inputTable")
    val resultTable = sensorTable
      //可以用scala里面的表达式来写
        .select('id,'temperature)
        .filter('id === "sensor1")
        //也可以这样子常规的写法
//        .select("id,temperature")
//        .filter("`id`='sensor1'")

    //第二种,使用SQL来实现
    val resultSqlTable = tableEnv.sqlQuery(
      """
        |select id, temperature
        |from inputTable
        |where id='sensor1'
        |""".stripMargin
    )
    resultTable.toAppendStream[(String, Double)].print("table result:")
    resultSqlTable.toAppendStream[(String,Double)].print("sql result: ")

    inputTable.toAppendStream[(String, Long, Double)].print("原始的,没做任何处理:")
    tableEnv.execute("table api test from file")
  }

}

代码结构及运行效果

将DataStream转换成表

对于一个DataStream,可以直接转换成Table,进而方便的调用Table API做转换操作

val dataStream: DataStream[SensorReadingTest5] = ....
val sensorTable: Table = tableEnv.fromDataStream(dataStream)

默认转换后的Table scheam和DataStream中的字段定义一一对应,也可以单独指定出来
val dataStream: DataStream[SensorReadingTest5] = ...
val sensorTable = tableEnv.fromDataStream(dataStream,'id,'timestamp,'temperature)

数据类型与Schema的对应

DataStream中的数据类型,与表的Scheam之间的对应关系,可以有两种: 基于字段名称,或者字段的位置
基于名称(name-based)
val sensorTable = tableEnv.fromDataStream(dataStream,'timestamp as 'ts, 'id as 'myId, 'temperature)

基于位置(position-based)
val sensorTable  = tableEnv.fromDataStream(dataStream,'myId,'ts)

创建临时视图(Temporary View)

tableEnv.createTemporaryView("sensorView", dataStream)
tableEnv.createTemporaryView("sensorView",dataStream,'id, 'temperature, 'timestamp as 'ts)

基于Table创建临时视图
tableEnv.createTemporaryView("sensorView", sensorTable)

输出表

表的输出是通过将数据写入TableSink来实现的
TableSink是一个通用接口,可以支持不同的文件格式、存储数据库和消息队列
输出表最直接的方法就是通过Table.insertinto()方法将一个Table写入注册过的TableSink中
tableEnv.connect(....).createTemporaryTable("outputTable")

val resultSqlTable: Table = .... resultTable.insertInto("outputTable")
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Flink从入门到真香(18、使用flink table api 从文件和kafka中读取数据) 的相关文章

  • Eclipse - 安装新的 JRE (Java SE 8 1.8.0)

    我正在尝试安装 Java 8 到目前为止我所做的 安装最新版本的 Eclipse 下载并安装 Java SE 运行时环境 8http www oracle com technetwork java javase downloads jre8
  • Python HMAC:类型错误:字符映射必须返回整数、None 或 unicode

    我在使用 HMAC 时遇到了一个小问题 运行这段代码时 signature hmac new key secret key msg string to sign digestmod sha1 我收到一个奇怪的错误 File usr loca
  • 通过 appassembler-maven-plugin 生成的脚本无法在 Spring Boot 应用程序中找到主类

    我使用 appassembler maven plugin 生成的启动脚本有问题 我有一个基本的 spring boot 应用程序 只有一个类 SpringBootApplication public class ScriptDemoApp
  • 用于缓存的 Servlet 过滤器

    我正在创建一个用于缓存的 servlet 过滤器 这个想法是将响应主体缓存到memcached 响应正文由以下方式生成 结果是一个字符串 response getWriter print result 我的问题是 由于响应正文将不加修改地放
  • 如何从日期中删除毫秒、秒、分钟和小时[重复]

    这个问题在这里已经有答案了 我遇到了一个问题 我想比较两个日期 然而 我只想比较年 月 日 这就是我能想到的 private Date trim Date date Calendar calendar Calendar getInstanc
  • 如何通过 Android 按钮单击运行单独的应用程序

    我尝试在 Android 应用程序中添加两个按钮 以从单独的两个应用程序订单系统和库存系统中选择一个应用程序 如图所示 我已将这两个应用程序实现为两个单独的 Android 项目 当我尝试运行此应用程序时 它会出现直到正确选择窗口 但是当按
  • pandas 相当于 np.where

    np where具有向量化 if else 的语义 类似于 Apache Spark 的when otherwise数据帧方法 我知道我可以使用np where on pandas Series but pandas通常定义自己的 API
  • Play.application() 的替代方案是什么

    我是 Play 框架的新手 我想读取conf文件夹中的一个文件 所以我用了Play application classloader getResources Data json nextElement getFile 但我知道 play P
  • 是否需要关闭没有引用它们的文件?

    作为一个完全的编程初学者 我试图理解打开和关闭文件的基本概念 我正在做的一项练习是创建一个脚本 允许我将内容从一个文件复制到另一个文件 in file open from file indata in file read out file
  • 避免 Java 中的重复导入:继承导入?

    有没有办法 继承 导入 Example 常见枚举 public enum Constant ONE TWO THREE 使用此枚举的基类 public class Base protected void register Constant
  • Lombok @Builder 不创建不可变对象?

    在很多网站上 我看到 lombok Builder 可以用来创建不可变的对象 https www baeldung com lombok builder singular https www baeldung com lombok buil
  • 如何指示 urwid 列表框的项目数多于当前显示的项目数?

    有没有办法向用户显示 urwid 列表框在显示部分上方 下方有其他项目 我正在考虑类似滚动条的东西 它可以显示条目的数量 或者列表框顶部 底部的单独栏 如果这个行为无法实现 有哪些方法可以实现这个通知 在我的研究过程中 我发现这个问题 ht
  • 无法通过 Python 子进程进行 SSH

    我需要通过堡垒 ssh 进入机器 因此 该命令相当长 ssh i
  • AWS Lambda 不读取环境变量

    我正在编写一个 python 脚本来查询 Qualys API 中的漏洞元数据 我在 AWS 中将其作为 lambda 函数执行 我已经在控制台中设置了环境变量 但是当我执行函数时 出现以下错误 module initialization
  • Hadoop NoSuchMethodError apache.commons.cli

    我在用着hadoop 2 7 2我用 IntelliJ 做了一个 MapReduce 工作 在我的工作中 我正在使用apache commons cli 1 3 1我把库放在罐子里 当我在 Hadoop 集群上使用 MapReduceJob
  • Django 管理器链接

    我想知道是否有可能 如果可以的话 如何 将多个管理器链接在一起以生成受两个单独管理器影响的查询集 我将解释我正在研究的具体示例 我有多个抽象模型类 用于为其他模型提供小型的特定功能 其中两个模型是DeleteMixin 和GlobalMix
  • pandas 中数据帧中的随机/洗牌行

    我目前正在尝试找到一种方法来按行随机化数据框中的项目 我在 pandas 中按列洗牌 排列找到了这个线程 在 pandas 中对 DataFrame 进行改组 排列 https stackoverflow com questions 157
  • ArrayList.clear() 和 ArrayList.removeAll() 有什么区别?

    假如说arraylist定义为ArrayList
  • python从二进制文件中读取16字节长的双精度值

    我找到了蟒蛇struct unpack 读取其他程序生成的二进制数据非常方便 问题 如何阅读16 字节长双精度数出二进制文件 以下 C 代码将 1 01 写入二进制文件三次 分别使用 4 字节浮点型 8 字节双精度型和 16 字节长双精度型
  • 即使调整大小,如何获得屏幕的精确中间位置

    好的 这个问题有两部分 当我做一个JFrame 并在其上画一些东西 即使我将宽度设置为 400 并使其在一个项目击中它时 当然 允许项目宽度 它会反弹回来 但由于某种原因 它总是偏离屏幕约 10 个像素 有没有办法解决这个问题 或者我只需要

随机推荐

  • #pragma once 是什么意思?

    和头文件中用 ifndef A H define A H Here is code endif 效果类似 包含pragma once语句的文件只会被编译一次 表示在编译的时候 这个文件只被包含 include 一次 这样 可以减少整个编译过
  • PHP框架的基本原理以及选择标准

    PHP框架的原理 说到PHP框架 可能很多PHP新手会感到有些胆怯 其实 PHP框架也不是那么深不可测的 框架就是别人使用PHP基础只是为你写好了的东西 只是封装在一起 这就好比我们使用PHP的函数 函数都是已近写好了的 我们只要按照函数使
  • 图解LeetCode——1812. 判断国际象棋棋盘中一个格子的颜色(难度:简单)

    一 题目 给你一个坐标 coordinates 它是一个字符串 表示国际象棋棋盘中一个格子的坐标 下图是国际象棋棋盘示意图 如果所给格子的颜色是白色 请你返回 true 如果是黑色 请返回 false 给定坐标一定代表国际象棋棋盘上一个存在
  • C/C++

    文章目录 常见面试题目讲解 宏定义 数据声明 类型修饰符的使用总结 位操作 访问固定内存位置 参考 麦子学院 嵌入式C语言高级 C语言函数的使用 常见面试题目讲解 参考 嵌入式程序员应该知道的0x10个基本问题 常见面试题目讲解 宏定义 1
  • Java设计模式——装饰者模式

    装饰者模式 一 概述 装饰者模式 装饰器模式 是一种结构型模式 定义 在不改变现有对象结构的情况下 动态地给该对象增加一些额外职责 功能 的模式 装饰者 Decorator 模式中的角色 抽象构件 Component 角色 定义一个抽象接口
  • 7-44 求整数的位数及各位数字之和

    对于给定的正整数N 求它的位数及其各位数字之和 输入格式 输入在一行中给出一个不超过109的正整数N 输出格式 在一行中输出N的位数及其各位数字之和 中间用一个空格隔开 输入样例 321 输出样例 3 6 include
  • Tomcat流程图分析

    org apache catalina startup Bootstrap 启动类 初始化步骤 从server开始到service connector 后实现了lifecycle 接口 bootstrape init gt catelina
  • Protobuf下载和编译

    系列导航 一 Protobuf下载和编译 二 Protobuf在Java中的简单使用 一 简介 protobuf全称Google Protocol Buffers 是google开发的的一套用于数据存储 网络通信时用于协议编解码的工具库 是
  • C#中导出百万级Excel只需几秒除了NPOI还可以这样

    场景 Winform中通过NPOI导出Excel的三种方式 HSSFWorkbook XSSFWorkbook SXSSFWorkbook 附代码下载 https blog csdn net BADAO LIUMANG QIZHI arti
  • 剪格子 蓝桥杯 211

    题目描述 如下图所示 3 x 3 的格子中填写了一些整数 我们沿着图中的红色线剪开 得到两个部分 每个部分的数字和都是 60 本题的要求就是请你编程判定 对给定的 m n 的格子中的整数 是否可以分割为两个部分 使得这两个区域的数字和相等
  • com.alibaba.fastjson.JSONArray cannot be cast to com.alibaba.fastjson.JSONObject

    json中类型转换问题 是错误的格式 例 JSONObject parseObject type slider show true start 1 end 100 正确的写法 JSONObject dataZoom new JSONObje
  • C# 委托(delegate)

    1 什么是委托 委托是一种引用类型 它是函数指针的托管版本 在C 中 委托是一种可以把引用存储为函数的类型 委托可以引用实例和静态方法 而函数指针只能引用静态方法 委托的声明非常类似于函数 和函数不同的的是委托不带函数体 并且需要Deleg
  • 初识Spring Boot

    目录 一 Spring Boot是什么 二 创建Spring Boot项目 1 使用IDEA创建 2 网页版创建 三 运行项目 一 Spring Boot是什么 简单来说Spring Boot就是Spring的 脚手架 就是一个框架 Spr
  • nodejs libuv学习

    读了一下libuv源代码 简单记录一些见解 https github com libuv libuv libev就是一个基于epoll封装事件的函数库 自身不带有线程池等操作 而libuv则是在libev基础上 加上线程操作的功能 大体运作
  • Java中Array.sort()的几种用法

    转载https www tuicool com articles iii6N3 Java的Arrays类中有一个sort 方法 该方法是Arrays类的静态方法 在需要对数组进行排序时 非常的好用 但是sort 的参数有好几种 下面我就为大
  • 【QT控件大小自适应窗口变化】

    问题 刚开始学习QT时 在窗口中放置一个个控件 而后运行程序 会发现改变窗口大小时 控件大小不随窗口大小变化而变化 导致窗口大小变化没意义 同时也让精心布局看起来很难看 本文提供一种使用BoxLayout中放置控件 所有可见控件能够随窗口大
  • 同仁堂-十大王牌、十大名药

    同仁堂 十大王牌 十大名药 官网 ZY123 com 中医123
  • WPS中编辑Word删除内容之后保存退出了如何恢复?

    目录 一 问题简述 二 Word用户 场景一 情况一 删除了内容没有退出文档 情况二 删除了内容退出文档 情况三 删除了文件退出文档 三 Wps用户 场景二 情况一 删除了内容没有退出文档 情况二 删除了内容退出文档 情况三 删除了文件退出
  • PAT 5 剪邮票

    剪邮票 如 图1 jpg 有12张连在一起的12生肖的邮票 现在你要从中剪下5张来 要求必须是连着的 仅仅连接一个角不算相连 比如 图2 jpg 图3 jpg 中 粉红色所示部分就是合格的剪取 请你计算 一共有多少种不同的剪取方法 请填写表
  • Flink从入门到真香(18、使用flink table api 从文件和kafka中读取数据)

    还是一样 要先引入依赖 在pom xml