Flink on Zeppelin-2

2023-11-19

Flink Interpreter类型

    首先介绍下Zeppelin中的Flink Interpreter类型。Zeppelin的Flink Interpreter支持Flink的所有API (DataSet, DataStream, Table API )。语言方面支持Scala,Python,SQL。下图是Zeppelin中支持的不同场景下的Flink Interpreter。





配置Flink Interpreter

       下图例举了所有重要的Flink配置信息,除此之外你还可以配置任意Flink的Configuration(https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html



内置入口变量

      Flink Interpreter (%flink) 为用户自动创建了下面6个变量作为Flink Scala程序的入口。



  • senv (StreamExecutionEnvironment),

  • benv (ExecutionEnvironment)

  • stenv (StreamTableEnvironment for blink planner)

  • btenv (BatchTableEnvironment for blink planner)

  • stenv_2 (StreamTableEnvironment for flink planner)

  • btenv_2 (BatchTableEnvironment for flink planner)



      PyFlinkInterpreter (%flink.pyflink, %flink.ipyflink) 为用户自动创建了6个python变量作为PyFlink程序的入口

  • s_env (StreamExecutionEnvironment),

  • b_env (ExecutionEnvironment)

  • st_env (StreamTableEnvironment for blink planner)

  • bt_env (BatchTableEnvironment for blink planner)

  • st_env_2 (StreamTableEnvironment for flink planner)

  • bt_env_2 (BatchTableEnvironment for flink planner)



Blink/Flink Planner

Flink 1.10中有2种table api的planner:flink & blink.



  • 如果你用DataSet api以及需要把DataSet转换成Table,那么就需要使用Flink planner的TableEnvironment (btenv_2 and stenv_2).

  • 其他场景下, 我们都会建议用户使用blink planner. 这也是Flink sql使用的planner(%flink.bsql & %flink.ssql)



使用Flink Batch SQL

      %flink.bsql 是用来执行Flink的batch sql. 运行 help 命令可以得到所有可用的命令





总的来说,Flink Batch SQL可以用来做2大任务:

  • 使用 insert into 语句来做 Batch ETL

  • 使用 select 语句来做BI 数据分析



基于Bank数据的Batch ETL

下面我们基于Bank (https://archive.ics.uci.edu/ml/datasets/bank+marketing)数据来做Batch ETL任务。

  • 首先用Flink Sql创建一个raw 数据的source table,以及清洗干净后的sink table。







  • 然后再定义Table Function来parse raw data。





  • 接下来就可以用insert into语句来进行数据转换(source table --> sink table)





  •  用select语句来Preview最终数据,验证insert into语句的正确性





基于Bank数据的BI数据分析

经过上面的数据清洗工作,接下来就可以对数据进行分析了。用户不仅可以使用标准的SQL Select语句进行分析,也可以使用Zeppelin的dynamic forms来增加交互性(TextBox,Select,Checkbox)





使用Flink UDF

      SQL虽然强大,但表达能力毕竟有限。有时候就要借助于UDF来表达更复杂的逻辑。Flink Interpreter 支持2种UDF (Scala + Python)。下面是2个简单的例子。



       Scala UDF

 

%flink

class ScalaUpper extends ScalarFunction {

def eval(str: String) = str.toUpperCase

}

btenv.registerFunction("scala_upper", new ScalaUpper())



    Python UDF



 

%flink.pyflink

class PythonUpper(ScalarFunction):

def eval(self, s):

return s.upper()

bt_env.register_function("python_upper", udf(PythonUpper(), DataTypes.STRING(), DataTypes.STRING()))







对Hive数据的数据分析

     除了可以分析Flink SQL创建的table之外,Flink也可以分析Hive上已有的table。如果要让Flink Interpreter使用Hive,那么需要做以下配置

  • 设置 zeppelin.flink.enableHive 为 true

  • Copy 下面这些 dependencies 到flink的 lib 目录

  • flink-connector-hive_{scala_version}-{flink.version}.jar

  • flink-hadoop-compatibility_{scala_version}-{flink.version}.jar

  • flink-shaded-hadoop-2-uber-{hadoop.version}-{flink-shaded.version}.jar

  • hive-exec-2.x.jar (for Hive 1.x, you need to copy hive-exec-1.x.jar, hive-metastore-1.x.jar, libfb303-0.9.2.jar and libthrift-0.9.2.jar)

  • 在Flink interpreter setting 里或者 zeppelin-env.sh里指定 HIVE_CONF_DIR

  • 在Flink interpreter setting 指定 zeppelin.flink.hive.version 为你使用的Hive版本



下面就用一个简单的例子展示如何在Zeppelin中用Flink查询Hive table



1. 用Zeppelin的jdbc interpreter查询hive tables





2. 用Flink sql 查询 hive table的schema





3. 用Flink Sql 查询hive table





       本文只是简单介绍如何在Zeppelin中使用Flink SQL + UDF,关于更多Flink SQL和UDF请参考Flink官方文档

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Flink on Zeppelin-2 的相关文章

  • 关于游戏设计状态

    状态转移在数学里究竟是干嘛的我也不多说了 毕竟大家都是做游戏的 也不需要这么高深的数学知识 我就从一个实例开始讲一下吧 看不懂那我也没办法了 死套公式也行 只要调整下系数问题也不大 以武器强化为例 武器强化等级假如总共有十个等级 从一级开始

随机推荐

  • 数据结构----对称矩阵压缩存储中下标的计算

    一 压缩存储的概念 首先看一个对称矩阵 以深灰色为对称轴 由于矩阵内数据对称 因此只需将任意一边的数据存储起来即可 考虑到存储单元的线性结构 我们可以以一维数组的形式将其存储起来 需要存储的元素为 各个元素对应在一维数组中的位置示意图 按行
  • vue3+element+sortablejs实现table表格 行列动态拖拽

    vue3 element sortablejs实现table动态拖拽 1 第一步我们要安装sortablejs依赖 2 在我们需要的组件中引入 3 完整代码 4 效果 5 扩展 判断要拖动的行能不能拖动并放置到新位置 1 第一步我们要安装s
  • Promise {}

    Promise
  • 二叉树的链式结构实现

    文章目录 前言 链式结构实现 创建结点结构体 构建树逻辑结构 遍历二叉树 计算二叉树高度 结点数 叶子数 前言 对于一般的二叉树 非完全二叉树 满二叉树 而言 用顺序表去存储 会造成空间的浪费 所以一般采用链式结构实现 对于非完全非满二叉树
  • 前端Tabs表单的使用

  • 【codeforces #290(div 1)】ABC题解

    A Fox And Names time limit per test 2 seconds memory limit per test 256 megabytes input standard input output standard o
  • 08-10章

    第8章 函数 1 def 定义函数 2 形参 实参 3 位置实参 关键字实参 可给形参指定默认值 4 返回值 使用reture语句将值返回到调用函数的代码行 5 让实参变成可选的 将某个形参设置为空值 6 返回字典 7 向函数传递列表 在函
  • 学海无涯苦作舟

    作者在Twitter上发的一条短讯 每一天 你一定要一起床就热情澎湃 否则 你就只是在打工 3 51 PM 2012 5 1 在我们创办Stack Overflow网站之后 有些人开始承认 我们构造了一个还过得去的 捕鼠器 把大家都吸引到那
  • 拷贝构造函数 浅拷贝与深拷贝

    目录 拷贝构造函数 浅拷贝 深拷贝 拷贝构造函数 上一期中我们讲述了构造函数的相关内容 谈到构造函数在形式上有几种分类 即带参数的 不带参数的以及参数列表初始化的 还有一种传引用的构造函数 称为拷贝构造函数 顾名思义 就是起到拷贝的功能 通
  • 函数的防抖和节流简述

    防抖和节流 即 限制函数的执行次数 防抖和节流二者非常相似 但还是有细微的不同 防抖 通过 setTimeout 的方式在一定的时间间隔内 将多次触发变成一次触发 比如用户在十秒内一直连续点击 但最后只会触发一次 简单举例 function
  • FreeBSD用ports安装Firefox不成功 + 解决办法!

    今天在FreeBSD 5 4 Release中安装Firefox 先更新ports tree到最新版本 FreeBSD cvsup g L 2 root ports supfile 然后到Firefox的目录中去执行make FreeBSD
  • 在linux下为arm开发板交叉编译openssl报错arm-linux-gcc.br_real: error: unrecognized command line option '-m64'

    ubuntu 1804下安装树莓派的编译器后 在编译openssl时遇到下面问题 因此在网上找资料 参考网上一位大神介绍 原文链接如下 https blog csdn net trustbo article details 76851481
  • 打开Excle出现配置进度解决方法

    网上的最多的解决方案如果解决不了 想一下最近是不是安装或者卸载过WPS 以下是WPS导致配置进度的解决方案 安装Office之后 会发现每次打开excel都会出现一个配置进度的对话框 但是Word 和 PPT 都不会 这就说明你的电脑有安装
  • 【程序员面试金典】01.04. 回文排列

    回文排列 给定一个字符串 编写一个函数判定其是否为某个回文串的排列之一 回文串是指正反两个方向都一样的单词或短语 排列是指字母的重新排列 回文串不一定是字典当中的单词 示例 1 输入 tactcoa 输出 true 排列有 tacocat
  • MySQL经典50道练习题及全网最详细解析

    MySQL练习 文章目录 MySQL练习 50道经典SQL练习题全网最详细解析 数据表介绍 建表语句 插入数据 练习题目 1 查询 01 课程比 02 课程成绩高的学生的信息及课程分数 2 查询同时存在 01 课程和 02 课程的情况 3
  • TypeError: 'float' object is not callable

    今天在做一道Python练习题时遇到的问题 记录一下 请输入三个整数a b c 判断能否以它们为三个边长构成三角形 若能 输出YES和面积 否则输出NO 刚开始写的代码如下 a int input 请输入一个整数 b int input 请
  • Java实现图片上传返回上传地址

    关于在实际开发中最常用也是用的最多的Java实现文档 图片上传 一 准备阶段 文档 图片上传有几种方式 包括传统的ajax上传 云上传 这里给大家实现通过代码将图片上传至七牛云服务器并返回图片地址 1 需申请一台七牛云服务器地址 可免费试用
  • js删除数组里的某个元素

    删除数组中的某个元素 首先需要确定需要删除元素的索引值 var arr 1 5 6 12 453 324 function indexOf val for var i 0 i lt arr length i if arr i val ret
  • 请修改考试服务器名称,考试服务器ip数据库地址

    考试服务器ip数据库地址 内容精选 换一换 安全组是一个逻辑上的分组 为同一个虚拟私有云内具有相同安全保护需求 并相互信任的弹性云服务器和云数据库RDS实例提供访问策略 为了保障数据库的安全性和稳定性 在使用云数据库RDS实例之前 您需要设
  • Flink on Zeppelin-2

    Flink Interpreter类型 首先介绍下Zeppelin中的Flink Interpreter类型 Zeppelin的Flink Interpreter支持Flink的所有API DataSet DataStream Table