【Hadoop生态圈】8.Flink实时计算引擎入门教程

2023-10-27

1.简介

  Fink是一个开源的分布式,高性能,高可用,准确的实时数据计算框架,它主要优点如下:

  • 流式计算: Fink可以连接处理流式(实时)数据。
  • 容错: Fink提供了有状态的计算,会记录任务的中间状态,当执行失败时可以实现故障恢复。
  • 可伸缩: Fink集群可以支持上千个节点。
  • 高性能: Fink能提供高吞吐,低延迟的性能。

三大实时计算框架对比:

  • Spark Streaming: 可以处理秒级别延迟的实时数据计算,但是无法处理真正的实时数据计算,适合小型且独立的实时项目。
  • Storm: 可以处理真正的实时计算需求,但是它过于独立没有自己的生态圈,适合能够接受秒级别延迟不需要Hadoop生态圈的实时项目。
  • Fink: 新一代实时计算引擎,它包含了Strorm和Spark Streaming的优点,它即可以实现真正意义的实时计算需求,也融入了Hadoop生态圈,适合对性能要求高吞吐低延迟的实时项目。

2.执行流程

在这里插入图片描述

3.核心三大组件

  • DataSource: 数据源,主要用来接受数据。例如: readTextFile(),socketTextStream(),fromCollection(),以及一些第三方数据源组件。
  • Transformation: 计算逻辑,主要用于对数据进行计算。例如:map(),flatmap(),filter(),reduce()等类型的算子。
  • DataSink: 目的地,主要用来把计算的结果数据输出到其他存储介质。例如Kafka,Redis,Elasticsearch等。

4.应用场景

  • 实时ETL: 集成实时数据计算系统现有的诸多数据通道和SQL灵活的加工能力,对实时数据进行清洗、归并和结构化处理。同时,为离线数仓进行有效的补充和优化,为数据实时传输计算通道。
  • 实时报表: 实时采集、加工和存储,实时监控和展现业务指标数据,让数据化运营实时化。
  • 监控预警: 对系统和用户行为进行实时检测和分析,实时检测和发现危险行为。
  • 在线系统: 实时计算各类数据指标,并利用实时结果及时调整在线系统的相关策略。

5.架构原理

Fink常用的两种架构是: Standalone(独立集群)和ON YARN。

  • Standalone: 独立部署,不依赖Hadoop环境,但是需要使用Zookeeper实现服务的高可用。
  • ON YARN: 依赖Hadoop环境的YARN实现Flink任务的调度,需要Hadoop版本2.2以上。

Flink ON YARN架构图如下:
在这里插入图片描述

  • 1.客户端将作业提交给 YARN 的资源管理器,这一步中会同时将 Flink 的 Jar 包和配置
    上传到 HDFS,以便后续启动 Flink 相关组件的容器。
  • 2.YARN 的资源管理器分配 Container 资源,启动 Flink JobManager,并将作业提交给
    JobMaster。这里省略了 Dispatcher 组件。
  • 3.JobMaster 向资源管理器请求资源(slots)。
  • 4.资源管理器向 YARN 的资源管理器请求 container 资源。
  • 5.YARN 启动新的 TaskManager 容器。
  • 6.TaskManager 启动之后,向 Flink 的资源管理器注册自己的可用任务槽。
  • 7.资源管理器通知 TaskManager 为新的作业提供 slots。
  • 8.TaskManager 连接到对应的 JobMaster,提供 slots。
  • 9.JobMaster 将需要执行的任务分发给 TaskManager,执行任务。

Flink ON YARN 在运行的时候可以细分为两种模式。

  • Session模式: 可以称为会话模式或多任务模式。这种模式会在YARN中初始化一个Flink集群,以后提交的任务都会提交到这个集群中,这个Flink集群会在YARN集群中,除非手动停止。
  • Per-Job模式: 可以称为单任务模式,这种模式每次提交Flink任务时任务都会创建一个集群,Flink任务之间都是互相独立,互不影响,执行任务资源会释放掉。

6.常用的API

Flink中提供了4种不同层次的API,每种都有对应的使用场景。

  • Sateful Stream Processing: 低级API,提供了对时间和状态的细粒度控制,简洁性和易用性较差,主要应用一些复杂事件处理逻辑上。
  • DataStream/DataSet API: 核心API,提供了针对实时数据和离线数据的处理,是对低级API进行的封装,提供了filter(),sum(),max(),min()等高级函数,简单易用。
  • Table API: 对DataStream/DataSet API做了进一步封装,提供了基于Table对象的一些关系型API。
  • SQL: 高级语言,Flink的SQL是基于Apache Calcite开发的,实现了标准SQL(类似于Hive的SQL),使用起来比其他API更加方便。Table API和SQL可以很容易结合使用,它们都返回Table对象。

在工作中能用SQL解决的优先使用SQL,复杂一些的考虑DataStream/DataSet API

DataStreamAPI中常用的Transformation函数。

算子 描述
map() 对数据流中的每个元素进行处理,输入一个元素返回一个元素
flatMap() 与map()类似,但是每个元素可以返回一个或多个元素
filter() 对数据流中每个元素进行判断,如果返回True则将其保留,否则将其删除
keyBy() 根据key对数据流分组
union() 合并多个流,流的数据类型必须一致
connect() 只能连接两个流,两个流的数据类型可以不同

7.java编写flink程序

引入依赖,此文用的flink版本是1.15.2。

    <properties>
        <flink.version>1.15.2</flink.version>
        <java.version>1.8</java.version>
        <slf4j.version>1.7.30</slf4j.version>
        <!--flink依赖的作用域 provided 表示表示该依赖包已经由目标容器提供,compile 标为默认值 -->
        <flink.scope>compile</flink.scope>
    </properties>

    <dependencies>
        <!-- core dependencies -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
            <scope>${flink.scope}</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.12</artifactId>
            <version>${flink.version}</version>
            <scope>${flink.scope}</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.12</artifactId>
            <version>${flink.version}</version>
            <scope>${flink.scope}</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
            <scope>${flink.scope}</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-files</artifactId>
            <version>${flink.version}</version>
            <scope>${flink.scope}</scope>
        </dependency>

        <!-- test dependencies -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-test-utils</artifactId>
            <version>${flink.version}</version>
            <scope>test</scope>
        </dependency>

        <!-- 引入日志管理相关依赖-->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>${slf4j.version}</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>${slf4j.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-to-slf4j</artifactId>
            <version>2.14.0</version>
        </dependency>

    </dependencies>

    <build>
        <finalName>flink</finalName>
        <plugins>
            <!-- scala 编译插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>1.2.1</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.ljm.hadoop.flink.Main</mainClass>
                                </transformer>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                    <resource>reference.conf</resource>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

DataStream执行模式:
从1.12.0版本以后,flink实现了api的流批一体化处理。DataStream新增一个执行模式(execution mode),通过设置不同的执行模式,即可实现流处理与批处理之间的切换,这样一来,dataSet基本就被废弃了。

  • STREAMING: 流执行模式(默认)
  • BATCH: 批执行模式
  • AUTOMATIC: 在这种模式下,将由程序根据输入数据源是否有界,来自动选择执行模式。
    在这里插入图片描述

以下为DataStream相关Api在Java中的简单应用

public class Main {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置执行模式
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        map(env);
//        flatMap(env);
//        union(env);
//        connect(env);
//        socketTextStream(env);
        env.execute("testJob");
    }

    /**
     * 对数据处理
     */
    private static void map(StreamExecutionEnvironment env) {
        //在测试阶段,可以使用fromElements构造数据流
        DataStreamSource<Integer> data = env.fromElements(1, 2, 3, 4, 5, 6, 7, 9, 11);
        //处理数据
        SingleOutputStreamOperator<Integer> numStream = data.map(new MapFunction<Integer, Integer>() {
            @Override
            public Integer map(Integer num) throws Exception {
                return num - 1;
            }
        });
        //使用一个线程打印数据
        numStream.print().setParallelism(1);
        //多线程输出(最大值=cpu总核数)
        //numStream.print();
    }

    /**
     * 将数据中的每行数据根据符号拆分为单词
     */
    private static void flatMap(StreamExecutionEnvironment env) {
        DataStreamSource<String> data = env.fromElements("hello,world", "hello,hadoop");
        //读取文件内容,文件内容格式  hello,world
        //DataStreamSource<String> data =  env.readTextFile("D:\\java\\hadoop\\text.txt");
        //处理数据
        SingleOutputStreamOperator<String> wordStream = data.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String line, Collector<String> collector) throws Exception {
                String[] words = line.split(",");
                for (String word : words) {
                    collector.collect(word);
                }
            }
        });
        wordStream.print().setParallelism(1);
    }

    /**
     * 过滤数据中的奇数
     */
    private static void filter(StreamExecutionEnvironment env) {
        DataStreamSource<Integer> data1 = env.fromElements(1, 2, 3, 4, 5, 6, 7);
        //处理数据
        SingleOutputStreamOperator<Integer> numStream = data1.filter(new FilterFunction<Integer>() {
            @Override
            public boolean filter(Integer num) throws Exception {
                return num % 2 == 1;
            }
        });
        numStream.print().setParallelism(1);
    }

    /**
     * 将两个流中的数字合并
     */
    private static void union(StreamExecutionEnvironment env) {
        //第1份数据流
        DataStreamSource<Integer> data1 = env.fromElements(1, 2, 3, 4);
        //第2份数据流
        DataStreamSource<Integer> data2 = env.fromElements(3, 4, 5, 6);
        //合并流
        DataStream unionData = data1.union(data2);
        unionData.print().setParallelism(1);
    }

    /**
     * 将两个数据源中的数据关联到一起
     */
    private static void connect(StreamExecutionEnvironment env) {
        //第1份数据流
        DataStreamSource<String> data1 = env.fromElements("user:tom,age:18");
        //第2份数据流
        DataStreamSource<String> data2 = env.fromElements("user:jack_age:18");
        //连接两个流
        ConnectedStreams<String, String> connectedStreams = data1.connect(data2);
        //处理数据
        SingleOutputStreamOperator<String> resStream = connectedStreams.map(new CoMapFunction<String, String, String>() {
            @Override
            public String map1(String s) throws Exception {
                return s.replace(",", "-");
            }
            @Override
            public String map2(String s) throws Exception {
                return s.replace("_", "-");
            }
        });
        resStream.print().setParallelism(1);
    }

    /**
     * 每隔3秒重socket读取数据
     */
    private static void socketTextStream(StreamExecutionEnvironment env) {
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        //加载数据源
        DataStreamSource<String> data = env.socketTextStream("127.0.0.1", 9001);
        //数据处理
        SingleOutputStreamOperator<String> wordStream = data.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String line, Collector<String> collector) throws Exception {
                String[] words = line.split(",");
                for (String word : words) {
                    collector.collect(word);
                }
            }
        });
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordCountStream = wordStream.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String s) throws Exception {
                return new Tuple2<String, Integer>(s, 1);
            }
        });
        //根据Tuple2中的第1列分组
        KeyedStream<Tuple2<String, Integer>, Tuple> keyedStream = wordCountStream.keyBy(0);
        //窗口滑动设置,对指定时间窗口(例如3s内)内的数据聚合统计,并且把时间窗口内的结果打印出来
        WindowedStream<Tuple2<String, Integer>, Tuple, TimeWindow> windowedStream = keyedStream.timeWindow(Time.seconds(3));
        //根据Tuple2中的第2列进行合并数据
        SingleOutputStreamOperator<Tuple2<String, Integer>> sumRes = windowedStream.sum(1);
        //数据输出
        sumRes.print();
    }

}

上面示例的socketTextStream方法中用到了socketTextStream函数需要通过netnat工具发送数据

netnat工具下载
在netnat目录下执行 nc -L -p 9001 -v
在这里插入图片描述

运行socketTextStream方法,可以发现控制台打印了数据
在这里插入图片描述
上图中的3和5表示线程Id,如果只需要单线程打印则需要在print()后面追加setParallelism(1);

sumRes.print().setParallelism(1);

修改完重新运行程序可以发现线程Id和>符号已经没有打印了。
在这里插入图片描述

在这里插入图片描述

8.把flink程序部署到hadoop环境上面运行

8.1.安装flink程序

flink下载地址,下载1.15.2版本然后上传到服务器 /home/soft/目录下解压

tar -zxvf flink-1.15.2-bin-scala_2.12.tgz

flink客户端节点上需要设置HADOOP_HOME和HADOOP_CLASSPATH环境变量

vi /etc/profile
export HADOOP_HOME=/home/soft/hadoop-3.2.4
export HADOOP_CLASSPATH=`${HADOOP_HOME}/bin/hadoop classpath`
export PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
source /etc/profile

8.2.编译java开发的flink应用

使用socketTextStream接受socket传输的数据
在这里插入图片描述

修改socketTextStream方法里面的代码,把127.0.0.1改成netcat工具部署机器ip地址

  DataStreamSource<String> data = env.socketTextStream("192.168.239.128", 9001);

需要把pom.xml文件中flink.scope属性值设置为provided,这些依赖不需要打进Jar包中。

    <properties>
        <flink.scope>provided</flink.scope>
    </properties>

执行命令打包

mvn clean package

在这里插入图片描述
把flink-1.0-SNAPSHOT.jar上传至/home/soft/flink-1.15.2目录下然后提交任务

8.3.提交Flink任务到YARN集群中

cd /home/soft/flink-1.15.2
bin/flink  run -m yarn-cluster -yjm 1024 -ytm 1024   flink-1.0-SNAPSHOT.jar

参数说明
bin/flink: 这个脚本启动的是Per-Job,bin/yarn-session.sh 则启动的是Session模式的
-m: 指定模式,yarn-cluster=集群模式,yarn-client=客户端模式
-yjm:每个JobManager内存 (default: MB)
-ytm:每个TaskManager内存 (default: MB)
在这里插入图片描述

8.4.测试任务并查看结果

在服务器上面安装netcat工具,然后发送数据,这台机器的ip必须和Java编写的Flink程序一致

yum install nc
nc -l 9001

在这里插入图片描述

使用浏览器访问: http://hadoop集群主节点ip:8088/cluster可以看到已提交的Flink任务,然后下图的点击顺序可以看到任务的执行结果
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

8.5.停止任务

通过YARN命令停止

yarn application -kill  application_1665651925022_0008  

或通过Flink命令停止

bin/flink cancel -yid application_1665651925022_0008  a39f8b9258c9b9d0c17eca768c5b54c3

【Hadoop生态圈】其它文章如下,后续会继续更新

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

【Hadoop生态圈】8.Flink实时计算引擎入门教程 的相关文章

随机推荐

  • PCA降维算法的介绍、多角度推导及python实现

    一 算法原理及数学推导 在对于数据的处理上 特征维度过高经常是一个无法忽视的问题 但是单纯的降维压缩对于数据本身会使得数据信息遭到严重损失 不过以PCA为代表的线性降维算法对这方面有所保证 它的目标是通过某种线性投影 将高维的数据映射到低维
  • JVM内存模型

    内存模型主要分为共享内存和线程独享内存 内存分配原理为1 JVM先检查new关键字 并在常量池中定位到一个类的引用 2 检查定位到的类是否已经经过了加载 解析 初始化等步骤 3 为其分配内存 存储对象实例 一 共享内存 1 运行时常量池 编
  • C++连接数据库

    一 sql h ifndef SQL H define SQL H include
  • 聊聊Spring注解@Transactional失效的那些事

    一 前言 emm 又又又踩坑啦 这次的需求主要是对逾期计算的需求任务进行优化 现有的计算任务运行时间太长了 简单描述下此次的问题 在项目中进行多个数据库执行操作时 我们期望的是将其整个封装成一个事务 要么全部成功 或者全部失败 然而在自测异
  • 通过刷脸来支付的确带给了我们很多便利

    在携带手机不方便或者受到限制的场所 也是刷脸支付的重要应用场景 比如游泳池附近 海边沙滩游乐场 比如易燃易爆的场所 学校 工厂等禁用手机的地方 随着刷脸支付如此私密的功能被越来越多的人接受 刷脸功能也一定不会止步于此 未来更多场景的应用是大
  • bottle.py 实现批量文件上传

    bottle py是python的一个Web框架 整个框架只有一个文件 几十K 却自带了路径映射 模板 简单的数据库访问等web框架组件 确实是个可用的框架 初学web开发可以拿来玩玩 其语法简单 部署也很方便 官方文档 http bott
  • 女程序员珍妮的故事

    我不想说出她的名字 这里就叫她珍妮吧 珍妮有5个最好的朋友 其中2个是 Google 的工程师 一个是 Eventbrite 的工程师 一个是架构师 另外一个是她的老爸 一个很可爱的人 是珍妮家乡的州足球队的主席 珍妮毕业时拿的是企业管理专
  • 梯度隐私泄露

    梯度隐私泄露 动机 梯度泄露推测方法 DLG 梯度泄露防御方法 梯度信息推测出数据集信息 三种预设 参考文献 动机 传统认为 在用户与用户 用户与中心之间传输梯度信息是安全的 本文打破了这种理所当然 已知学习模型 权重参数W和真实梯度 W
  • centos 运行.exe文件 storescu.exe 上传 DICOM文件

    由于不会使用DCMTK源码在centos中进行运行 所以使用windows下的exe进行上传 windows 下 DICOM文件上传命令 storescu exe aec LINKINGMED IP 104 v D DICOMSOURCE
  • linux开机自动挂载

    1 修改文件 vim etc fstab 在文件最后一行添加内容 dev cdrom mnt iso9660 defaults 0 0 2 验证是否挂载成功
  • mac打开ssh服务器文件,mac ssh 远程服务器文件

    mac ssh 远程服务器文件 内容精选 换一换 已成功登录Java性能分析 待安装Guardian的服务器已开启sshd 待安装Guardian的服务器已安装JRE JRE版本要求为Huawei JDK 8或者Open JDK 8 11
  • python s append_pandas中如何使用合并append函数?

    介绍了这么多关于pandas拼接的方法 那你知道如果想要拼接拼接一个或者多个 还可以追加serise到原来的dataframe里面如何操作吗 其实还是很简单的 使用append函数就可以解决 本文介绍pandas中使用合并append函数的
  • qq里面发送图片显示服务器被拒绝,如何解决qq无法发送图片的问题

    原因一 一般是网速比较慢 电信联通教育他们内部传图片大家都能收到 教育网给联通发就一般收不到了 清理下你的QQ图片文件夹 里面堆积的图片太多了 原因二 由于网络问题 查看手机移动数据流量或者wifi连接是否正常 QQ版本问题 将QQ版本升级
  • Python 中 pass的使用

    def sample n samples Generate random samples from the fitted Gaussian distribution pass 在python中有时候能看到定义一个def函数 函数内容部分填写
  • (一)人工智能大纲摘要:《人工智能发展白皮书-技术架构篇(2018年9月)》

    以下博客的主要内容 摘自白皮书 http www caict ac cn kxyj qwfb bps index 1 htm 人工智能发展白皮书 技术架构篇 2018年 人工智能安全白皮书 2018 2018世界人工智能产业发展蓝皮书 电信
  • 小米路由器4a千兆版刷固件_小米路由器4A千兆版恢复出厂设置的方法

    摘 要 说明 几乎所有品牌的家用无线路由器 都可以通过长按机身的复位按键 来进行恢复出厂设置 小米路由器4千兆版也不例外 同样可以通过长按复位按键的方式 来恢复出厂设置 具体的操作步骤和注意事项 下面 本文将给大家详细介绍 小米路由器4千兆
  • 【Python 3.7】三明治:编写一个函数,它接受顾客要在三明治中添加的一系列食材。这个 函数只有一个形参(它收集函数调用中提供的所有食材)……

    Python 3 7 三明治 编写一个函数 它接受顾客要在三明治中添加的一系列食材 这个函数只有一个形参 它收集函数调用中提供的所有食材 并打印一条消息 对顾客点的三明治进行概述 调用这个函数三次 每次都提供不同数量的实参 传递任意数量的实
  • C++11 noexcept

    C 11 引入了noexcept 它有两类作用 noexcept 指定符和noexcept 运算符 一 noexcept 指定符 1 含义 指定函数是否抛出异常 2 两类语法 noexcept 1 同noexcept true noexce
  • QT中 No such file or directory的解决办法

    报错具体情境 使用D Qt Qt5 8 0 5 8 mingw53 32 gt 终端进行编译程序 步骤如下 qmake project 生成CH01 pro 文件 qmake CH01 pro 生成 Makefile 总的makefile
  • 【Hadoop生态圈】8.Flink实时计算引擎入门教程

    文章目录 1 简介 2 执行流程 3 核心三大组件 4 应用场景 5 架构原理 6 常用的API 7 java编写flink程序 8 把flink程序部署到hadoop环境上面运行 8 1 安装flink程序 8 2 编译java开发的fl