SeaTunnel 学习笔记

2023-05-16

第1章 Seatunnel概述

官网地址:https://seatunnel.apache.org/
文档地址:https://interestinglab.github.io/seatunnel-docs/#/

1.1 SeaTunnel是什么

SeaTunnel是一个简单易用,高性能,能够应对海量数据的数据处理产品。

SeaTunnel的前身是Waterdrop(中文名:水滴)自2021年10月12日更名为SeaTunnel。2021年12月9日,SeaTunnel正式通过Apache软件基金会的投票决议,以全票通过的优秀表现正式成为Apache孵化器项目。

1.2 SeaTunnel在做什么

本质上,SeaTunnel不是对Saprk和Flink的内部修改,而是在Spark和Flink的基础上做了一层包装。它主要运用了控制反转的设计模式,这也是SeaTunnel实现的基本思想。

SeaTunnel的日常使用,就是编辑配置文件。编辑好的配置文件由SeaTunnel转换为具体的Spark或Flink任务。如图所示。
在这里插入图片描述

1.3 SeaTunnel的应用场景

SeaTunnel适用于以下场景:
海量数据的同步
海量数据的集成
海量数据的ETL
海量数据聚合
多源数据处理

SeaTunnel的特点:
基于配置的低代码开发,易用性高,方便维护。
支持实时流式传输
离线多源数据分析
高性能、海量数据处理能力
模块化的插件架构,易于扩展
支持用SQL进行数据操作和数据聚合
支持Spark structured streaming
支持Spark 2.x

目前SeaTunnel的长板是他有丰富的连接器,又因为它以Spark和Flink为引擎。所以可以很好地进行分布式的海量数据同步。通常SeaTunnel会被用来做出仓入仓工具,或者被用来进行数据集成

1.4 SeaTunnel的工作流程

在这里插入图片描述

1.5 SeaTunnel目前的插件支持

1.5.1 Spark连接器插件(Source)

Spark连接器插件数据库类型SourceSink
BatchFake
ElasticSearch
File
Hive
Hudi
Jdbc
MongoDB
Neo4j
Phoenix
Redis
Tidb
Clickhouse
Doris
Email
Hbase
Kafka
Console
Kudu
Redis
StreamFakeStream
KafkaStream
SocketSTream

1.5.2 Flink 连接器插件(Source)

Flink连接器插件数据库类型SourceSink
Druid
Fake
File
InfluxDb
Jdbc
Kafka
Socket
Console
Doris
ElasticSearch

1.5.3 Spark & Flink 转换插件

转换插件SparkFlink
Add
CheckSum
Convert
Date
Drop
Grok
Json
Kv
Lowercase
Remove
Rename
Repartition
Replace
Sample
Split
Sql
Table
Truncate
Uppercase
Uuid

第2章 Seatunnel安装和使用

2.1 SeaTunnel的环境依赖

Java版本需要>=1.8
SeaTunnel支持Spark 2.x(尚不支持Spark 3.x)。支持Flink 1.9.0及其以上的版本。

2.2 SeaTunnel的下载和安装

去官网下载解压即可

2.3 SeaTunnel的依赖环境配置

在config/目录中有一个seatunnel-env.sh脚本
在这里插入图片描述
修改为自己的spark或者flink路径即可

2.4 示例1: SeaTunnel 快速开始

官方的flink案例
1.选择任意路径,创建一个文件。这里我们选择在SeaTunnel的config路径下创建一个example01.conf

vim example01.conf

2.在文件中编辑如下内容

# 配置Spark或Flink的参数
env {
  # You can set flink configuration here
  execution.parallelism = 1
  #execution.checkpoint.interval = 10000
  #execution.checkpoint.data-uri = "hdfs://node1:9092/checkpoint"
}
# 在source所属的块中配置数据源
source {
    SocketStream{
          host = node1
          result_table_name = "fake"
          field_name = "info"
    }
}
# 在transform的块中声明转换插件
transform {
  Split{
    separator = "#"
    fields = ["name","age"]
  }
  sql {
sql = "select info, split(info) as info_row from fake"
}
}
# 在sink块中声明要输出到哪
sink {
  ConsoleSink {}
}

3.开启flink集群

bin/start-cluster.sh

4.开启一个netcat服务来发送数据

nc -lk 9999

5.使用SeaTunnel来提交任务

bin/start-seatunnel-flink.sh --config config/example01.conf

6.在netcat上发送数据
在这里插入图片描述

7.在Flink webUI上查看输出结果
在这里插入图片描述

第3章 SeaTunnel基本原理

3.1 SeaTunnel的启动脚本

截至目前,SeaTunnel有两个启动脚本。
提交spark任务用start-seatunnel-spark.sh。
提交flink任务则用start-seatunnel-flink.sh。

start-seatunnle-flink.sh可以指定3个参数
分别是:
–config参数用来指定应用配置文件的路径

–variable参数可以向配置文件传值。配置文件内是支持声明变量的。然后我们可以通过命令行给配置中的变量赋值。
变量声明语法如下

sql {
    sql = "select * from (select info,split(info) from fake) where age > '"${age}"'"
  }

在配置文件的任何位置都可以声明变量。并用命令行参数–variable key=value的方式将变量值传进去,你也可以用它的短命令形式 -i key=value。传递参数时,key需要和配置文件中声明的变量名保持一致。

如果需要传递多个参数,那就在命令行里面传递多个-i或–variable key=value。

bin/start-seatunnel-flink.sh --config/xxx.sh -i age=18 -i sex=man

–check参数用来检查config语法是否合法(check功能还尚在开发中,因此–check参数是一个虚设)

3.2 SeaTunnel的配置文件

3.2.1 应用配置的4个基本组件

一个完整的SeaTunnel配置文件应包含四个配置组件。分别是:
env{} source{} --> transform{} --> sink{}
在这里插入图片描述
在Source和Sink数据同构时,如果业务上也不需要对数据进行转换,那么transform中的内容可以为空。具体需根据业务情况来定。

3.2.2 SeaTunnel中的核心数据结构Row

Row是SeaTunnel中数据传递的核心数据结构。对flink来说,source插件需要给下游的转换插件返回一个DataStream,转换插件接到上游的DataStream进行处理后需要再给下游返回一个DataStream。最后Sink插件将转换插件处理好的DataStream输出到外部的数据系统。
在这里插入图片描述
因为DataStream可以很方便地和Table进行互转,所以将Row当作核心数据结构可以让转换插件同时具有使用代码(命令式)和sql(声明式)处理数据的能力。

3.2.3 env块

env块中可以直接写spark或flink支持的配置项。比如并行度,检查点间隔时间。检查点hdfs路径等。在SeaTunnel源码的ConfigKeyName类中,声明了env块中所有可用的key。如图所示:
在这里插入图片描述

3.2.4 source块

source块是用来声明数据源的。source块中可以声明多个连接器。比如:

# 伪代码
env {
    ...
}

source {
  hdfs { ... }  
  elasticsearch { ... }
  jdbc {...}
}

transform {
    sql {
     sql = """
        select .... from hdfs_table 
        join es_table 
        on hdfs_table.uid = es_table.uid where ..."""
    }
}

sink {
    elasticsearch { ... }
}

需要注意的是,所有的source插件中都可以声明result_table_name。如果你声明了result_table_name。SeaTunnel会将source插件输出的DataStream转换为Table并注册在Table环境中。当你指定了result_table_name,那么你还可以指定field_name,在注册时,给Table重设字段名。

3.2.5 transform块

transform{}块中可以声明多个转换插件。所有的转换插件都可以使用source_table_name,和result_table_name。同样,如果我们声明了result_table_name,那么我们就能声明field_name。

目前可用的插件总共有两个,一个是Split,另一个是sql。
在这里插入图片描述
Split插件并没有对数据流进行任何的处理,而是将它直接return了。反之,它向表环境中注册了一个名为split的UDF(用户自定义函数)。而且,函数名是写死的。

指定soure_table_name对于sql插件的意义不大。因为sql插件可以通过from子句来决定从哪个表里抽取数据。

3.2.6 sink块

Sink块里可以声明多个sink插件,每个sink插件都可以指定source_table_name。不过因为不同Sink插件的配置差异较大,所以在实现时建议参考官方文档。

3.3 SeaTunnel的基本原理

在这里插入图片描述
1.程序会解析你的应用配置,并创建环境
2.配置里source{},transform{},sink{}三个块中的插件最终在程序中以List集合的方式存在。
3.由Excution对象来拼接各个插件,这涉及到选择source_table,注册result_table等流程,注册udf等流程。并最终触发执行

3.4 小结

用一张图将SeaTunnel中的重要概念串起来
在这里插入图片描述
如果你不指定source_table_name,插件会使用它在配置文件上最近的上一个插件的输出作为输入。

第4章 应用案例

4.1 flink通过JDBC方式读取hive数据

这个已经在2.12版本里面启用,将hive-jdbc-3.1.2-standalone.jar放入flink的lib中

env {
  # You can set flink configuration here
  execution.parallelism = 1
  #execution.checkpoint.interval = 10000
  #execution.checkpoint.data-uri = "hdfs://node1:9092/checkpoint"
}
# 在source所属的块中配置数据源
source {
     JdbcSource {
         driver = org.apache.hive.jdbc.HiveDriver
         url = "jdbc:hive2://node1:10000"
         username = hive
         query = "select * from yes.student"
     }
}
# 在transform的块中声明转换插件
transform {
}
# 在sink块中声明要输出到哪
sink {
  ConsoleSink {}
}

4.2 Kafka进Kafka出的简单ETL

对test_csv主题中的数据进行过滤,仅保留年龄在18岁以上的记录

env {
  # You can set flink configuration here
  execution.parallelism = 1
  #execution.checkpoint.interval = 10000
  #execution.checkpoint.data-uri = "hdfs://hadoop102:9092/checkpoint"
}

# 在source所属的块中配置数据源
source {
    KafkaTableStream {
        consumer.bootstrap.servers = "node1:9092"
        consumer.group.id = "seatunnel-learn"
        topics = test_csv
        result_table_name = test
        format.type = csv
        schema = "[{\"field\":\"name\",\"type\":\"string\"},{\"field\":\"age\", \"type\": \"int\"}]"
        format.field-delimiter = ";"
        format.allow-comments = "true"
        format.ignore-parse-errors = "true"
    }
}
# 在transform的块中声明转换插件
transform {

  sql {
    sql = "select name,age from test  where age > '"${age}"'"
  }
}
# 在sink块中声明要输出到哪
sink {
   kafkaTable {
    topics = "test_sink"
    producer.bootstrap.servers = "node1:9092"
        }
}

启动任务

bin/start-seatunnel-flink.sh --config config/example03.conf -i age=18

4.3 Kafka 输出到Doris进行指标统计

使用回话日志统计用户的总观看视频数,用户最常会话市场,用户最小会话时长,用户最后一次会话时间。

create database test_db;
CREATE TABLE `example_user_video` (
  `user_id` largeint(40) NOT NULL COMMENT "用户id",
  `city` varchar(20) NOT NULL COMMENT "用户所在城市",
  `age` smallint(6) NULL COMMENT "用户年龄",
  `video_sum` bigint(20) SUM NULL DEFAULT "0" COMMENT "总观看视频数",
  `max_duration_time` int(11) MAX NULL DEFAULT "0" COMMENT "用户最长会话时长",
  `min_duration_time` int(11) MIN NULL DEFAULT "999999999" COMMENT "用户最小会话时长",
  `last_session_date` datetime REPLACE NULL DEFAULT "1970-01-01 00:00:00" COMMENT "用户最后一次会话时间"
) ENGINE=OLAP
AGGREGATE KEY(`user_id`, `city`, `age`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`user_id`) BUCKETS 16
;   
env {
	execution.parallelism = 1
}

source {
    KafkaTableStream {
	    consumer.bootstrap.servers = "node1:9092"
	    consumer.group.id = "seatunnel5"
	    topics = test
	    result_table_name = test
	    format.type = json
	    schema = "{\"session_id\":\"string\",\"video_count\":\"int\",\"duration_time\":\"long\",\"user_id\":\"string\",\"user_age\":\"int\",\"city\":\"string\",\"session_start_time\":\"datetime\",\"session_end_time\":\"datetime\"}"
	    format.ignore-parse-errors = "true"
	}
}

transform{
	sql {
		sql = "select user_id,city,user_age as age,video_count as video_sum,duration_time as max_duration_time,duration_time as min_duration_time,session_end_time as last_session_date from test"
		result_table_name = test2
	}
}

sink{
	DorisSink {
		source_table_name = test2
    	fenodes = "node1:8030"
    	database = test_db
    	table = example_user_video
    	user = atguigu
    	password = 123321
    	batch_size = 50
    	doris.column_separator="\t"
    	doris.columns="user_id,city,age,video_sum,max_duration_time,min_duration_time,last_session_date"
	}
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

SeaTunnel 学习笔记 的相关文章

  • 51单片机通过两个按键控制流水灯方向

    按键一接单片机P3 1 xff0c 按键2接P3 0 8个流水灯接P2口 以下是代码 xff1a include lt regx52 H gt include lt INTRINS H gt 延时函数 xff0c xms等于1 xff0c
  • 【SLAM】VINS-MONO解析——sliding window

    8 sliding window 8 1 理论基础 实际上 xff0c 这一部分跟后端非线性优化是一起进行的 xff0c 这一部分对应的非线性优化的损失函数的先验部分 理论基础部分的代码基本在第7章部分 8 1 1 上一次非线性优化结束 x
  • 【SLAM】VINS-Fusion解析——流程

    VINS Fusion分析 因为时间原因 xff0c 没有像vins mono看的和写的那么具体 有时间的话我会补充完整版 vins fusion不像mono那样有三个node xff0c 它只有一个node xff0c 在rosNodeT
  • 【SLAM】VINS-MONO解析——对vins-mono的一点小改动

    vins mono刷了三遍 xff0c 手写vio刷了两遍 xff0c SLAM十四讲刷了两三遍 xff0c 从一开始完全看不懂是啥 xff0c 不知道什么是SLAM xff0c 什么是VIO xff0c 什么是VINS xff0c 什么是
  • 【SLAM】VINS-MONO解析——基于vins-mono的双目slam系统开发

    这个系统是基于香港科技大学飞行机器人组的开源框架VINS Mono开发的 xff0c 原开源框架是针对单目SLAM 本双目SLAM系统是在原单目开源框架基础上的二次深度开发 xff0c 外部接口与原框架一致 这个项目是我的研究课题项目 xf
  • 【SLAM】VINS-MONO解析——回环检测和重定位

    9 回环检测与重定位 本部分内容涉及到的代码大部分在pose graph文件夹下 xff0c 少部分在vins estimator里 原创内容 xff0c 转载请先与我联系并注明出处 xff0c 谢谢 xff01 系列内容请点击 xff1a
  • API开发手册在线中文版

    Android中文版 api手册地址 xff1a http www matools com api android Bootstrap 3 api手册地址 xff1a http www matools com api bootstrap3
  • 项目规划时间轴流程图

    项目规划时间轴流程图 项目规划时间轴流程图 对一个项目从开始到竣工的整个过程进行总结归纳 时间线图 又叫时间轴图 能以历史进程为载体 将过往的重要事项或者里程碑 标注在轴线上 并加以说明 它的作用是能够可视化内容 以图文的形式呈现出来 时间
  • 架构功能图

    支付系统功能架构图 支付业务的基础系统的复杂性和稳定性是支付业务是否能够及时安全处理的根本 该支付系统功能架构图收集了支付宝的系统架构 完整的支付系统整体架构 从产品分类 模块功能和业务流程 了解支付产品服务的设计 支付系统要兼并合规性 易
  • Web开发技术架构图

    Web开发技术架构图 大型web系统架构动态应用 是相对于网站静态内容而言 是指以c c 43 43 php Java perl net等服务器端语言开发的网络应用软件 比如论坛 网络相册 1 学习Web开发原理 包括MVC MTV等Web
  • 大数据架构图

    大数据管理数据处理过程图 大数据 big data 指无法在一定时间范围内用常规软件工具进行捕捉 管理和处理的数据集合 是需要新处理模式才能具有更强的决策力 洞察力 大数据处理的主要流程包括数据收集 数据存储 数据处理 数据应用等主要环节
  • hutool定时器的使用封装

    启动定时器 建议在main 入口放置 支持秒级别定时任务 CronUtil setMatchSecond true 启动 CronUtil start 使用deamon模式 xff0c CronUtil start true 轮子 impo
  • 网络拓扑架构图

    阿里云应用网络拓扑图 云应用网络架构是指建立在可编程的基础设施之上 基于统一的运行管理平台 按需的分配资源 的网络架构 云网络架构在物理链路 主机网络 网关 控制器四个纬度上全面升级 大大提高了整体网络性能 公司办公网络拓扑图 对于很多大型
  • Kubernetes(K8S)中文文档

    Kubernetes K8S 中文文档 前言Kubernetes是什么 互动教程 Kubernetes 基础概述使用Minikube 部署 Kubernetes 集群使用 kubectl 创建 DeploymentKubernetes 中浏
  • ASCII纯文本绘制流程图

    我们使用纯文本写代码 xff0c 有了Markdown又可以使用纯文本写文档 xff0c 那么图片 xff0c 能不能使用纯文本描述呢 xff1f Text Flow是什么 xff1f Text Flow xff1a 一个强大的在线ASCI
  • ioDraw - 超好用的在线白板,能够手绘各种流程图、架构图

    今天 xff0c 推荐给大家的是一款超级好用的在线绘图工具 xff0c ioDraw在线白板 关于 ioDraw在线白板 一款非常轻量的在线白板工具 xff0c 可以直接在浏览器打开 xff0c 轻松绘制具有手绘风格的图形 比如可以绘制这样
  • ioDraw - 免费的在线图表制作工具

    介绍 xff1a ioDraw是一款数据可视化图表库 xff0c 提供直观 xff0c 生动 xff0c 可交互 xff0c 可个性化定制的数据可视化图表 xff0c 支持折线图 柱状图 饼图 散点图等 地址 xff1a https www
  • 在线流程图绘制工具测评-ioDraw

    对于产品经理和程序员而言 xff0c 流程图制作工具绝对是刚需 xff0c 比起流程图制作软件 xff0c 我更喜欢在线工具 今天跟大家分享一个好用的流程图在线制作工具 ioDraw xff0c 他是免费的 且简单易上手 xff0c 功能强
  • 一款免费好用的代码在线比较工具

    在线代码对比工具 xff08 码工具 xff09 是一款免费的在线文本对比工具 xff0c 无需注册 xff0c 可以在线对两段文本进行对比 xff0c 检测 比较两个文本有什么不同的差异 xff0c 以便修改 xff0c 常用于程序代码
  • LTE学习笔记之接口协议

    LTE 接口协议 三层两面三层两面 空中接口UuL2功能模块用户面控制面 L3功能模块 xff08 控制面 xff09 RRC模块1 系统信息广播2 寻呼3 RRC连接管理4 无线资源控制5 移动性管理NAS信令 地面接口同级接口 X2用户

随机推荐