flink大数据处理流式计算详解

2023-11-19

flink大数据处理

文章目录

flink官方文档地址

  • 离线计算和实时计算 :是对数据处理的【延迟】不一样(一个实时和非实时)
  • 流式计算和批量计算: 是对数据处理的【方式】不一样(一个流式和一个批量)
  • 结论:离线和批量不等价,实时和流式不等价,因为不是同个维度的东西

二、WebUI可视化界面(测试用)

  • 访问:ip:8081
  • 方式一:服务端部署Flink集群(生产环境)
  • 方式二:本地依赖添加(测试开发)
      <!--Flink web ui-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

在这里插入图片描述

  • nc命令介绍
    • Linux nc命令用于设置网络路由的
    • nc -lk 8888
    • 开启 监听模式,用于指定nc将处于监听模式, 等待客户端来链接指定的端口
    • 1 解压netcat-win32-1.11.zip
    • 2 配置解压好的目录路径到PATH环境变量
    • 3 测试
    nc -l -p 8888
    
  • win | linux 需要安装

在这里插入图片描述

http://127.0.0.1:8081/

本地UI界面
在这里插入图片描述

三、Flink部署

  • 运行流程
    • 用户提交Flink程序到JobClient,
    • JobClient的 解析、优化提交到JobManager
    • TaskManager运行task, 并上报信息给JobManager
    • 通俗解释
      • JobManager 包工头
      • TaskManager 任务组长
      • Task solt 工人 (并行去做事情)
        在这里插入图片描述
        在这里插入图片描述

Flink 是一个分布式系统,需要有效分配和管理计算资源才能执行流应用程序
* 运行时由两种类型的进程组成
* 一个 JobManager

* 一个或者多个 TaskManager

在这里插入图片描述

  • 什么是JobManager(大Boss,包工头)
    • 协调 Flink 应用程序的分布式执行的功能
      • 它决定何时调度下一个 task(或一组 task)
      • 对完成的 task 或执行失败做出反应
      • 协调 checkpoint、并且协调从失败中恢复等等
  • 什么是TaskManager (任务组长,搬砖的人)
    • 负责计算的worker,还有上报内存、任务运行情况给JobManager等
    • 至少有一个 TaskManager,也称为 worker执行作业流的 task,并且缓存和交换数据流
    • 在 TaskManager 中资源调度的最小单位是 task slot

3.1 JobManager

JobManager进程由三个不同的组件组

ResourceManager

负责 Flink 集群中的资源提供、回收、分配 - 它管理 task slots

Dispatcher

提供了一个 REST 接口,用来提交 Flink 应用程序执行
为每个提交的作业启动一个新的 JobMaster。
运行 Flink WebUI 用来提供作业执行信息

JobMaster

负责管理单个JobGraph的执行,Flink 集群中可以同时运行多个作业,每个作业都有自己的 JobMaster
至少有一个 JobManager,高可用(HA)设置中可能有多个 JobManager,其中一个始终是 leader,其他的则是 standby

3.2 TaskManager

TaskManager中 task slot 的数量表示并发处理 task 的数量
一个 task slot 中可以执行多个算子,里面多个线程
算子 opetator
source
transformation
sink
对于分布式执行,Flink 将算子的 subtasks _链接_成 tasks,每个 task 由一个线程执行
图中source和map算子组成一个算子链,作为一个task运行在一个线程上
将算子链接成 task 是个有用的优化:它减少线程间切换、缓冲的开销,并且减少延迟的同时增加整体吞吐量
在这里插入图片描述

  • 5 个 subtask 执行,因此有 5 个并行线程
    • Task 正好封装了一个 Operator 或者 Operator Chain 的 parallel instance。
    • Sub-Task 强调的是同一个 Operator 或者 Operator Chain 具有多个并行的 Task
    • 图中source和map算子组成一个算子链,作为一个task运行在一个线程上
      • 算子链接成 一个 task 它减少线程间切换、缓冲的开销,并且减少延迟的同时增加整体吞吐量
        在这里插入图片描述

Task Slots 任务槽

  • Task Slot是Flink中的任务执行器,每个Task Slot可以运行多个subtask ,每个subtask会以单独的线程来运行
  • 每个 worker(TaskManager)是一个 JVM 进程,可以在单独的线程中执行一个(1个solt)或多个 subtask
  • 为了控制一个 TaskManager 中接受多少个 task,就有了所谓的 task slots(至少一个)
  • 每个 task slot 代表 TaskManager 中资源的固定子集
  • 注意
    • 所有Task Slot平均分配TaskManger的内存, TaskSolt 没有 CPU 隔离
    • 当前 TaskSolt 独占内存空间,作业间互不影响
    • 一个TaskManager进程里有多少个taskSolt就意味着多少个并发
    • task solt数量建议是cpu的核数,独占内存,共享CPU

https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/concepts/flink-architecture/#tasks-and-operator-chains

https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/concepts/glossary/

  • Flink 是分布式流式计算框架
    • 程序在多节点并行执行,所以就有并行度 Parallelism
    • DataStream 就像是有向无环图(DAG),每一个 数据流(DataStream) 以一个或多个 source 开始,以一个或多个 sink 结束
  • 流程
    • 一个数据流( stream) 包含一个或多个分区,在不同的线程/物理机里并行执行
    • 每一个算子( operator) 包含一个或多个子任务( subtask),子任务在不同的线程/物理机里并行执行
    • 一个算子的子任务subtask 的个数就是并行度( parallelism)
      在这里插入图片描述

3.3 并行度的调整配置

  • Flink流程序中不同的算子可能具有不同的并行度,可以在多个地方配置,有不同的优先级
  • Flink并行度配置级别 (高到低)
    • 算子
      • map( xxx ).setParallelism(2)
    • 全局env
      • env.setParallelism(2)
    • 客户端cli
      • ./bin/flink run -p 2 xxx.jar
    • Flink配置文件
      • /conf/flink-conf.yaml 的 parallelism.defaul 默认值
  • 某些算子无法设置并行度
  • 本地IDEA运行 并行度默认为cpu核数

3.4 区分 TaskSolt和parallelism并行度配置

  • task slot是静态的概念,是指taskmanager具有的并发执行能力;
  • parallelism是动态的概念,是指 程序运行时实际使用的并发能力
  • 前者是具有的能力比如可以100个,后者是实际使用的并发,比如只要20个并发就行。
  • Flink有3中运行模式
    env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
    
    • STREAMING 流处理
    • BATCH 批处理
    • AUTOMATIC 根据source类型自动选择运行模式,基本就是使用这个

四、Source Operator(资源算子)

在这里插入图片描述

  • 第一层是最底层的抽象为有状态实时流处理,抽象实现是 Process Function,用于底层处理
  • 第二层抽象是 Core APIs,许多应用程序不需要使用到上述最底层抽象的 API,而是使用 Core APIs 进行开发
    • 例如各种形式的用户自定义转换(transformations)、联接(joins)、聚合(aggregations)、窗口(windows)和状态(state)操作等,此层 API 中处理的数据类型在每种编程语言中都有其对应的类。

在这里插入图片描述

  • Source来源
    • 元素集合
      • env.fromElements
      • env.fromColletion
      • env.fromSequence(start,end);
    • 文件/文件系统
      • env.readTextFile(本地文件);
      • env.readTextFile(HDFS文件);
    • 基于Socket
      • env.socketTextStream(“ip”, 8888)
    • 自定义Source,实现接口自定义数据源,rich相关的api更丰富
      • 并行度为1
        • SourceFunction
        • RichSourceFunction
      • 并行度大于1
        • ParallelSourceFunction
        • RichParallelSourceFunction
  • Connectors与第三方系统进行对接(用于source或者sink都可以)
    • Flink本身提供Connector例如kafka、RabbitMQ、ES等
    • 注意:Flink程序打包一定要将相应的connetor相关类打包进去,不然就会失败
  • Apache Bahir连接器
    • 里面也有kafka、RabbitMQ、ES的连接器更多

和外部系统进行读取写入的

  • 第一种 Flink 里面预定义的 source 和 sink。
  • 第二种 Flink 内部也提供部分 Boundled connectors。
  • 第三种是第三方 Apache Bahir 项目中的连接器。
  • 第四种是通过异步 IO 方式
    • 异步I/O是Flink提供的非常底层的与外部系统交互

设置不同的并行度

package cn.mesmile.flink.demo;

import cn.mesmile.flink.jdkstream.VideoOrder;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author zb
 * @date 2022/8/21 16:56
 * @Description
 */
public class FlinkCustomSourceDemo04 {

    public static void main(String[] args) throws Exception {
        // 构建执行任务环境以及任务的启动的入口, 存储全局相关的参数
//        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 创建本地 UI 界面操作  127.0.0.1:8081
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        env.setParallelism(2);

        // 本机默认并行数为 12  ---> 本机配置为 6 核 12 线程
        VideoOrderSource videoOrderSource = new VideoOrderSource();

        DataStream<VideoOrder> videoOrderDataStream = env.addSource(videoOrderSource);
        videoOrderDataStream.filter(new FilterFunction<VideoOrder>() {
            @Override
            public boolean filter(VideoOrder value) throws Exception {
                return value.getMoney() > 5;
            }
        }).setParallelism(3);


        videoOrderDataStream.print().setParallelism(4);

        //DataStream需要调用execute,可以取个名称
        env.execute("custom source job");
    }
}

在这里插入图片描述

五、Sink Operator(输出算子)

  • Sink 输出源
    • 预定义
      • print
      • writeAsText (过期)
    • 自定义
      • SinkFunction
      • RichSinkFunction
        • Rich相关的api更丰富,多了Open、Close方法,用于初始化连接等
    • flink官方提供 Bundle Connector
      • kafka、ES 等
    • Apache Bahir
      • kafka、ES、Redis等

六、Flink滑动-滚动时间窗和触发器

https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/datastream/operators/windows/

Windows are at the heart of processing infinite streams(Window是处理无限数据量的核心)

数据流是一直源源不断产生,业务需要聚合统计使用,比如每10秒统计过去5分钟的点击量、成交额等

Windows 就可以将无限的数据流拆分为有限大小的“桶 buckets”,然后程序可以对其窗口内的数据进行计算

窗口认为是Bucket桶,一个窗口段就是一个桶,比如8到9点是一个桶,9到10点是一个桶

  • time Window 时间窗口,即按照一定的时间规则作为窗口统计

    time-tumbling-window 时间滚动窗口 (用的多)

    time-sliding-window 时间滑动窗口 (用的多)

    session WIndow 会话窗口,即一个会话内的数据进行统计,相对少用

  • count Window 数量窗口,即按照一定的数据量作为窗口统计,相对少用

  • 滑动窗口 Sliding Windows

    • 窗口具有固定大小
    • 窗口数据有重叠
    • 例子:每10s统计一次最近1min内的订单数量

在这里插入图片描述

  • 滚动窗口 Tumbling Windows
    • 窗口具有固定大小
    • 窗口数据不重叠
    • 例子:每10s统计一次最近10s内的订单数量

在这里插入图片描述

  • 【窗口大小size】 和 【滑动间隔 slide】
    • tumbling-window:滚动窗口: size=slide,如:每隔10s统计最近10s的数据
    • sliding-window:滑动窗口: size>slide,如:每隔5s统计最近10s的数据
    • size<slide的时候,如每隔15s统计最近10s的数据,那么中间5s的数据会丢失,所以开发中不用

6.1 窗口API

  • 有keyBy 用 window() api
  • 没keyBy 用 windowAll() api ,并行度低
  • 方括号 ([…]) 中的命令是可选的,允许用多种不同的方式自定义窗口逻辑

在这里插入图片描述

  • 一个窗口内 的是左闭右开
  • countWindow没过期,但timeWindow在1.12过期,统一使用window;
  • 窗口分配器 Window Assigners
    • 定义了如何将元素分配给窗口,负责将每条数据分发到正确的 window窗口上
    • window() 的参数是一个 WindowAssigner,flink本身提供了Tumbling、Sliding 等Assigner
  • 窗口触发器 trigger
    • 用来控制一个窗口是否需要被触发
    • 每个 窗口分配器WindowAssigner 都有一个默认触发器,也支持自定义触发器
  • 窗口 window function ,对窗口内的数据做啥?
    • 定义了要对窗口中收集的数据做的计算操作

    • 增量聚合函数

      aggregate(agg函数,WindowFunction(){ })

      • 窗口保存临时数据,每进入一个新数据,会与中间数据累加,生成新的中间数据,再保存到窗口中
      • 常见的增量聚合函数有 reduceFunction、aggregateFunction
      • min、max、sum 都是简单的聚合操作,不需要自定义规则
        AggregateFunction<IN, ACC, OUT>
        IN是输入类型,ACC是中间聚合状态类型,OUT是输出类型,是聚合统计当前窗口的数据
    • 全窗口函数

      apply(new processWindowFunction(){ })

      • 窗口先缓存该窗口所有元素,等窗口的全部数据收集起来后再触发条件计算
      • 常见的全窗口聚合函数 windowFunction(未来可能弃用)、processWindowFunction(可以获取到窗口上下文 更多信息)
        IN是输入类型,OUT是输出类型,KEY是分组类型,W是时间窗
        WindowFunction<IN, OUT, KEY, W extends Window>
    • 如果想处理每个元素更底层的API的时候用

      //对数据进行解析 ,process对每个元素进行处理,相当于 map+flatMap+filter

      process(new KeyedProcessFunction(){processElement、onTimer}

  • 基于数量的滚动窗口, 滑动计数窗口
  • 案例:
    • 统计分组后同个key内的数据超过5次则进行统计 countWindow(5)
    • 只要有2个数据到达后就可以往后统计5个数据的值, countWindow(5, 2)

6.2 AggregateFunction增量聚合函数

  • 定义了要对窗口中收集的数据做的计算操作

  • 增量聚合函数

    aggregate(agg函数,WindowFunction(){ })

    • 窗口保存临时数据,每进入一个新数据,会与中间数据累加,生成新的中间数据,再保存到窗口中
    • 常见的增量聚合函数有 reduceFunction、aggregateFunction
    • min、max、sum 都是简单的聚合操作,不需要自定义规则
      AggregateFunction<IN, ACC, OUT>
      IN是输入类型,ACC是中间聚合状态类型,OUT是输出类型,是聚合统计当前窗口的数据

6.3 全窗口函数

apply(new WindowFunction(){ })

  • 窗口先缓存该窗口所有元素,等窗口的全部数据收集起来后再触发条件计算
  • 常见的全窗口聚合函数 windowFunction(未来可能弃用)、processWindowFunction(可以获取到窗口上下文 更多信息,包括窗口信息)

IN是输入类型,OUT是输出类型,KEY是分组类型,W是时间窗

WindowFunction<IN, OUT, KEY, W extends Window>

6.4 processWindowFunction全窗口函数知识

全窗口函数
process(new ProcessWindowFunction(){})
窗口先缓存该窗口所有元素,等窗口的全部数据收集起来后再触发条件计算
常见的全窗口聚合函数 windowFunction(未来可能弃用)、processWindowFunction(可以获取到窗口上下文 更多信息,包括窗口信息)

IN是输入类型,OUT是输出类型,KEY是分组类型,W是时间窗
ProcessWindowFunction<IN, OUT, KEY, W extends Window>

在这里插入图片描述

窗口函数对比

  • 增量聚合
    • aggregate(new AggregateFunction(){});
  • 全窗口聚合
    • apply(new WindowFunction(){})
    • process(new ProcessWindowFunction(){}) //比上面apply强
  • * Flink里面定义窗口,可以引用不同的时间概念
  • Flink里面时间分类
    • 事件时间EventTime(重点关注)
      • 事件发生的时间
      • 事件时间是每个单独事件在其产生进程上发生的时间,这个时间通常在记录进入 Flink 之前记录在对象中
      • 在事件时间中,时间值 取决于数据产生记录的时间,而不是任何Flink机器上的
    • 进入时间 IngestionTime
      • 事件到进入Flink
    • 处理时间ProcessingTime
      • 事件被flink处理的时间
      • 指正在执行相应操作的机器的系统时间
      • 是最简单的时间概念,不需要流和机器之间的协调,它提供最佳性能和最低延迟
      • 但是在分布式和异步环境中,处理时间有不确定性,存在延迟或乱序问题

在这里插入图片描述

  • 事件时间(EventTime)已经能够解决所有的问题了,那为何还要用处理时间呢????
  • 处理时间(ProcessingTime)由于不用考虑事件的延迟与乱序,所以处理数据的速度高效
  • 如果一些应用比较重视处理速度而非准确性,那么就可以使用处理时间(ProcessingTime),但结果具有不确定性
  • 事件时间(EventTime)有延迟,但是能够保证处理的结果具有准确性,并且可以处理延迟甚至无序的数据

做了一个电商平台买 “超短男装衣服”,如果要统计10分钟内成交额,你认为是哪个时间比较好?

  • (EventTime) 下单支付时间是2022-11-11 01-01-01
  • (IngestionTime ) 进入Flink时间2022-11-11 01-03-01(网络拥堵、延迟)
  • (ProcessingTime)进入窗口时间2022-11-11 01-31-01(网络拥堵、延迟)

七、link乱序延迟时间处理-Watermark

  • 一般我们都是用EventTime事件时间进行处理统计数据
  • 但数据由于网络问题延迟、乱序到达会导致窗口计算数据不准确
  • 需求:比如时间窗是 [12:01:01,12:01:10 ) ,但是有数据延迟到达
    • 当 12:01:10 秒数据到达的时候,不立刻触发窗口计算
    • 而是等一定的时间,等迟到的数据来后再关闭窗口进行计算
  • 每天10点后就是迟到,需要扣工资
  • 老王上班 路途遥远(延迟) 经常迟到
    • HR就规定迟到5分钟后就罚款100元(5分钟就是watermark)
    • 迟到30分钟就是上午事假处理 (5~30分就是 allowLateness )
    • 不请假都是要来的 (超过30分钟就是侧输出流,sideOutPut兜底)
  • 超过5分钟就不用来了吗?还是要来的继续工作的,不然今天上午工资就没了
  • 那如果迟到30分钟呢? 也要来的,不然就容易产生更大的问题,缺勤开除。。。。
  • Watermark 水位线介绍
    • 由flink的某个operator操作生成后,就在整个程序中随event数据流转
      • With Periodic Watermarks(周期生成,可以定义一个最大允许乱序的时间,用的很多)
      • With Punctuated Watermarks(标点水位线,根据数据流中某些特殊标记事件来生成,相对少)
    • 衡量数据是否乱序的时间,什么时候不用等早之前的数据
    • 是一个全局时间戳,不是某一个key下的值
    • 是一个特殊字段,单调递增的方式,主要是和数据本身的时间戳做比较
    • 用来确定什么时候不再等待更早的数据了,可以触发窗口进行计算,忍耐是有限度的,给迟到的数据一些机会
    • 注意
      • Watermark 设置太小会影响数据准确性,设置太大会影响数据的实时性,更加会加重Flink作业的负担
      • 需要经过测试,和业务相关联,得出一个较合适的值即可
  • 窗口触发计算的时机
    • watermark之前是按照窗口的关闭时间点计算的 [12:01:01,12:01:10 )
    • watermark之后,触发计算的时机
      • 窗口内有数据
      • Watermaker >= Window EndTime窗口结束时间
    • 触发计算后,其他窗口内数据再到达也被丢弃
    • Watermaker = 当前计算窗口最大的事件时间 - 允许乱序延迟的时间

在这里插入图片描述

  • window大小为10s,窗口是W1 [23:12:00~23:12:10) 、 W2[23:12:10~23:12:20)
    • 下面是数据的event time
    • 数据A 23:12:07
    • 数据B 23:12:11
    • 数据C 23:12:08
    • 数据D 23:12:17
    • 数据E 23:12:09
  • 没加入watermark,由上到下进入flink
    • 数据B到了之后,W1就进行了窗口计算,数据只有A
    • 数据C 迟到了3秒,到了之后,由于W1已经计算了,所以就丢失了数据C
  • 加入watermark, 允许5秒延迟乱序,由上到下进入flink
    • 数据A到达
      • watermark = 12:07 - 5 = 12:02 < 12:10 ,所以不触发W1计算, A属于W1
    • 数据B到达
      • watermark = max{ 12:11, 12:07} - 5 = 12:06 < 12:10 ,所以不触发W1计算, B属于W2
    • 数据C到达
      • watermark = max{12:08, 12:11, 12:07} - 5 = 12:06 < 12:10 ,所以不触发W1计算, C属于W1
    • 数据D到达
      • watermark = max{12:17, 12:08, 12:11, 12:07} - 5 = 12:12 > 23:12:10 , 触发W1计算, D属于W2
    • 数据E到达
      • watermark = max{12:09, 12:17, 12:08, 12:11, 12:07} - 5 = 12:12 > 23:12:10 , 之前已触发W1计算, 所以丢失了E数据,
  • Watermaker 计算 = **当前计算窗口最大的事件时间 **- 允许乱序延迟的时间
  • 什么时候触发W1窗口计算
    • Watermaker >= Window EndTime窗口结束时间
    • 当前计算窗口最大的事件时间 - 允许乱序延迟的时间 >= Window EndTime窗口结束时间
  • 测试数据
    • 窗口 [23:12:00 ~ 23:12:10) | [23:12:10 ~ 23:12:20)
    • 触发窗口计算条件
      • 窗口内有数据
      • watermark >= 窗口endtime
      • 即 当前计算窗口最大的事件时间 - 允许乱序延迟的时间 >= Window EndTime窗口结束时间

java,2022-11-11 23:12:07,10

java,2022-11-11 23:12:11,10

java,2022-11-11 23:12:08,10

mysql,2022-11-11 23:12:13,10 // 触发 13 - 3 ≥ 10

java,2022-11-11 23:12:13,10

java,2022-11-11 23:12:17,10

java,2022-11-11 23:12:09,10

java,2022-11-11 23:12:20,10

java,2022-11-11 23:12:22,10

java,2022-11-11 23:12:23,10 // 触发 23 -3 ≥ 20

  • 窗口时间
  • 并行度调整为1

7.1 数据延迟处理

Flink 最后的兜底延迟数据处理 测输出流实战

  • 超过了watermark的等待后,还有延迟数据到达怎么办?
  • watermark先输出,然后配置allowedLateness 再延长时间,然后到了后更新之前的窗口数据
  • 数据超过了allowedLateness 后,就丢失了吗?用侧输出流 SideOutput

八、Flink乱序延迟时间处理-多层保证措施

简介: Flink乱序延迟时间处理-多层保证措施介绍和归纳

  • 面试题:如何保证在需要的窗口内获得指定的数据?数据有乱序延迟
    • flink采用watermark 、allowedLateness() 、sideOutputLateData()三个机制来保证获取数据
    • watermark的作用是防止数据出现延迟乱序,允许等待一会再触发窗口计算,提前输出
    • allowLateness,是将窗口关闭时间再延迟一段时间.设置后就像window变大了
      • 那么为什么不直接把window设置大一点呢?或者把watermark加大点?
      • watermark先输出数据,allowLateness会局部修复数据并主动更新窗口的数据输出
      • 这期间的迟到数据不会被丢弃,而是会触发窗口重新计算
    • sideOutPut是最后兜底操作,超过allowLateness后,窗口已经彻底关闭了,就会把数据放到侧输出流
      • 测输出流 OutputTag tag = new OutputTag(){}, 由于泛型查除问题,需要重写方法,加花括号
  • 应用场景:实时监控平台
    • 可以用watermark及时输出数据
    • allowLateness 做短期的更新迟到数据
    • sideOutPut做兜底更新保证数据准确性
  • 总结Flink的机制
    • 第一层 窗口window 的作用是从DataStream数据流里指定范围获取数据。
    • 第二层 watermark的作用是防止数据出现乱序延迟,允许窗口等待延迟数据达到,再触发计算
    • 第三层 allowLateness 会让窗口关闭时间再延迟一段时间, 如果还有数据达到,会局部修复数据并主动更新窗口的数据输出
    • 第四层 sideOutPut侧输出流是最后兜底操作,在窗口已经彻底关闭后,所有过期延迟数据放到侧输出流,可以单独获取,存储到某个地方再批量更新之前的聚合的数据
    • 注意
      • Flink 默认的处理方式直接丢弃迟到的数据
      • sideOutPut还可以进行分流功能
      • DataStream没有getSideOutput方法,SingleOutputStreamOperator才有,
  • 版本弃用API

新接口,WatermarkStrategyTimestampAssignerWatermarkGenerator 因为其对时间戳和 watermark 等重点的抽象和分离很清晰,并且还统一了周期性和标记形式的 watermark 生成方式

新接口之前是用AssignerWithPeriodicWatermarks和AssignerWithPunctuatedWatermarks ,现在可以弃用了

九、Flink的状态State管理

Flink的状态State介绍和应用场景解析

  • 什么是State状态
    • 数据流处理离不开状态管理,比如窗口聚合统计、去重、排序等
    • 是一个Operator的运行的状态/历史值,是维护在内存中
    • 流程:一个算子的子任务接收输入流,获取对应的状态,计算新的结果,然后把结果更新到状态里面

  • 有状态和无状态介绍

    • 无状态计算: 同个数据进到算子里面多少次,都是一样的输出,比如 filter
    • 有状态计算:需要考虑历史状态,同个输入会有不同的输出,比如sum、reduce聚合操作
  • 状态管理分类

    • ManagedState(用的多)
      • Flink管理,自动存储恢复
      • 细分两类
        • Keyed State 键控状态(用的多)
          • 有KeyBy才用这个,仅限用在KeyStream中,每个key都有state ,是基于KeyedStream上的状态
          • 一般是用richFlatFunction,或者其他richfunction里面,在open()声明周期里面进行初始化
          • ValueState、ListState、MapState等数据结构
        • Operator State 算子状态(用的少,部分source会用)
          • ListState、UnionListState、BroadcastState等数据结构
    • RawState(用的少)
      • 用户自己管理和维护
      • 存储结构:二进制数组
  • State数据结构(状态值可能存在内存、磁盘、DB或者其他分布式存储中)

    • ValueState 简单的存储一个值(ThreadLocal / String)
      • ValueState.value()
      • ValueState.update(T value)
    • ListState 列表
      • ListState.add(T value)
      • ListState.get() //得到一个Iterator
    • MapState 映射类型
      • MapState.get(key)
      • MapState.put(key, value)
  • State状态后端:存储在哪里

  • Flink 内置了以下这些开箱即用的 state backends :

    • (新版)HashMapStateBackend、EmbeddedRocksDBStateBackend
      • 如果没有其他配置,系统将使用 HashMapStateBackend。
    • (旧版)MemoryStateBackend、FsStateBackend、RocksDBStateBackend
      • 如果不设置,默认使用 MemoryStateBackend。
  • 状态详解

    • HashMapStateBackend 保存数据在内部作为Java堆的对象。
      • 键/值状态和窗口操作符持有哈希表,用于存储值、触发器等
      • 非常快,因为每个状态访问和更新都对 Java 堆上的对象进行操作
      • 但是状态大小受集群内可用内存的限制
      • 场景:
        • 具有大状态、长窗口、大键/值状态的作业。
        • 所有高可用性设置。
    • EmbeddedRocksDBStateBackend 在RocksDB数据库中保存状态数据
      • 该数据库(默认)存储在 TaskManager 本地数据目录中
      • 与HashMapStateBackend在java存储 对象不同,数据存储为序列化的字节数组
      • RocksDB可以根据可用磁盘空间进行扩展,并且是唯一支持增量快照的状态后端。
      • 但是每个状态访问和更新都需要(反)序列化并可能从磁盘读取,这导致平均性能比内存状态后端慢一个数量级
      • 场景
        • 具有非常大状态、长窗口、大键/值状态的作业。
        • 所有高可用性设置
    • 旧版
      MemoryStateBackend(内存,不推荐在生产场景使用)
      FsStateBackend(文件系统上,本地文件系统、HDFS, 性能更好,常用)

    RocksDBStateBackend (无需担心 OOM 风险,是大部分时候的选择)

  • 配置

    • 方式一:可以flink-conf.yaml使用配置键在 中配置默认状态后端state.backend
      配置条目的可能值是hashmap (HashMapStateBackend)、rocksdb (EmbeddedRocksDBStateBackend)
      或实现状态后端工厂StateBackendFactory的类的完全限定类名

    #全局配置例子一

    state.backend: hashmap

    state.checkpoint-storage: jobmanager

    #全局配置例子二

    state.backend: rocksdb

    state.checkpoints.dir: file:///checkpoint-dir/

    state.checkpoint-storage: filesystem

    • 方式二:代码 单独job配置例子
    //代码配置一(基于内存)
    
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
    env.setStateBackend(new HashMapStateBackend());
    
    env.getCheckpointConfig().setCheckpointStorage(new JobManagerCheckpointStorage());
    
    
    //代码配置二(基于磁盘)
    
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
    env.setStateBackend(new EmbeddedRocksDBStateBackend());
    
    env.getCheckpointConfig().setCheckpointStorage("file:///checkpoint-dir");
    
    //或者
    
    env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("file:///checkpoint-dir"));
    
    
    - 备注:使用 RocksDBStateBackend 需要加依赖
    
    <dependency>
    
      <groupId>org.apache.flink</groupId>
    
      <artifactId>flink-statebackend-rocksdb_${scala.version}</artifactId>
    
      <version>1.13.1</version>
    
    </dependency>
    

Flink的Checkpoint-SavePoint和端到端状态一致性介绍

  • 什么是Checkpoint 检查点
    • Flink中所有的Operator的当前State的全局快照
    • 默认情况下 checkpoint 是禁用的
    • Checkpoint是把State数据定时持久化存储,防止丢失
    • 手工调用checkpoint,叫 savepoint,主要是用于flink集群维护升级等
    • 底层使用了Chandy-Lamport 分布式快照算法,保证数据在分布式环境下的一致性
  • 开箱即用,Flink 捆绑了这些检查点存储类型:
    • 作业管理器检查点存储 JobManagerCheckpointStorage
    • 文件系统检查点存储 FileSystemCheckpointStorage
  • 配置

//全局配置checkpoints

state.checkpoints.dir: hdfs:///checkpoints/

//作业单独配置checkpoints

env.getCheckpointConfig().setCheckpointStorage(“hdfs:///checkpoints-data/”);

//全局配置savepoint

state.savepoints.dir: hdfs:///flink/savepoints

  • Savepoint 与 Checkpoint 的不同之处

    • 类似于传统数据库中的备份与恢复日志之间的差异
    • Checkpoint 的主要目的是为意外失败的作业提供【重启恢复机制】,
    • Checkpoint 的生命周期由 Flink 管理,即 Flink 创建,管理和删除 Checkpoint - 无需用户交互
    • Savepoint 由用户创建,拥有和删除, 主要是【升级 Flink 版本】,调整用户逻辑
    • 除去概念上的差异,Checkpoint 和 Savepoint 的当前实现基本上使用相同的代码并生成相同的格式
  • 端到端(end-to-end)状态一致性

    数据一致性保证都是由流处理器实现的,也就是说都是在Flink流处理器内部保证的

    在真实应用中,了流处理器以外还包含了数据源(例如Kafka、Mysql)和输出到持久化系统(Kafka、Mysql、Hbase、CK)

    端到端的一致性保证,是意味着结果的正确性贯穿了整个流处理应用的各个环节,每一个组件都要保证自己的一致性。

    • Source
      • 需要外部数据源可以重置读取位置,当发生故障的时候重置偏移量到故障之前的位置
    • 内部
      • 依赖Checkpoints机制,在发生故障的时可以恢复各个环节的数据
    • Sink:
      • 当故障恢复时,数据不会重复写入外部系统,常见的就是 幂等和事务写入(和checkpoint配合)

十、Flink 复杂事件处理 CEP讲解+案例实战

  • 什么是FlinkCEP
  • 用途
    • 检测和发现无边界事件流中多个记录的关联规则,得到满足规则的复杂事件
    • 允许业务定义要从输入流中提取的复杂模式序列
  • 使用流程
    • 定义pattern
    • pattern应用到数据流,得到模式流
    • 从模式流 获取结果
DataStream<Event> input = ...

Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(

  new SimpleCondition<Event>() {

  @Override

  public boolean filter(Event event) {

  return event.getId() == 42;

  }

  }

  ).next("middle").subtype(SubEvent.class).where(

  new SimpleCondition<SubEvent>() {

  @Override

  public boolean filter(SubEvent subEvent) {

  return subEvent.getVolume() >= 10.0;

  }

  }

  ).followedBy("end").where(

  new SimpleCondition<Event>() {

  @Override

  public boolean filter(Event event) {

  return event.getName().equals("end");

  }

  }

  );

PatternStream<Event> patternStream = CEP.pattern(input, pattern);

DataStream<Alert> result = patternStream.process(

  new PatternProcessFunction<Event, Alert>() {

  @Override

  public void processMatch(

  Map<String, List<Event>> pattern,

  Context ctx,

  Collector<Alert> out) throws Exception {

  out.collect(createAlertFrom(pattern));

  }

  });

- CEP并不包含在flink中,使用前需要自己导入

  <dependency>

  <groupId>org.apache.flink</groupId>

  <artifactId>flink-cep_${scala.version}</artifactId>

  <version>${flink.version}</version>

</dependency>
  • 模式(Pattern):定义处理事件的规则
    • 三种模式PatternAPI
      • 个体模式(Individual Patterns):组成复杂规则的每一个单独的模式定义,就是个体模式
      • 组合模式(Combining Patterns):很多个体模式组合起来,形成组合模式
      • 模式组(Groups of Patterns):将一个组合模式作为条件嵌套在个体模式里,就是模式组
  • 近邻模式
    • 严格近邻:期望所有匹配事件严格地一个接一个出现,中间没有任何不匹配的事件, API是.next()
    • 宽松近邻:允许中间出现不匹配的事件,API是.followedBy()
    • 非确定性宽松近邻:可以忽略已经匹配的条件,API是followedByAny()
    • 指定时间约束:指定模式在多长时间内匹配有效,API是within
    • 如果您不希望事件类型直接跟随另一个,notNext()
    • 如果您不希望事件类型介于其他两种事件类型之间,notFollowedBy()
    • 模式分类
      • 单次模式:接收一次一个事件
      • 循环模式:接收一个或多个事件
  • 其他参数
    • times:指定固定的循环执行次数
    • greedy:贪婪模式,尽可能多触发
    • oneOrMore:指定触发一次或多次
    • timesOrMore:指定触发固定以上的次数
    • optional:要么不触发要么触发指定的次数

在这里插入图片描述

十一、Flink项目打包插件讲解+部署阿里云实战

Flink 部署方式是灵活,主要是对Flink计算时所需资源的管理方式不同

* 文档:https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/deployment/overview/

* Local 本地部署,直接启动进程,适合调试使用

* 直接部署启动服务
* Standalone Cluster集群部署,flink自带集群模式

* Hadoop YARN 计算资源统一由Hadoop YARN管理资源进行调度,按需使用提高集群的资源利用率

* Kubernetes 部署

* Docker部署

11.1本地安装

* Flink下载地址

* https://flink.apache.org/zh/downloads.html
* flink版本 1.13.1(课程安装包那边有提供)
* 步骤
* 解压 tar -zxvf
* 目录介绍
* conf
* flink-conf.yaml

#web ui 端口
rest.port=8081
​

#调整

jobmanager.memory.process.size: 1000m
taskmanager.memory.process.size: 1000m

*   bin

*   start-cluster.sh
*   stop-cluster.sh
*   yarn-session.sh

*   example
*   启动 bin/start-cluster.sh
*   停止 bin/stop-cluster.sh

*   查看进程 jps

*   TaskManagerRunner
*   StandaloneSessionClusterEntrypoint

*   网络安全组或者防火墙开放端口 8081
*   访问地址 http://ip:8081

11.2 测试本地安装

flink测试官方案例

  • 创建文件
cd /usr/local/software/flink/examples/source
vim xdclass_source.txt
​
java xdclass
springboot springcloud
html flink
springboot redis
java flink
kafka flink
java springboot
  • bin目录运行
./flink run  /usr/local/software/flink/examples/batch/WordCount.jar  
--input /usr/local/software/flink/examples/source/xdclass_source.txt 
--output /usr/local/software/flink/examples/source/xdclass_result.txt
  • 访问web UI (有个小bug,内存不够,页面访问失败则重新点击或者加大内存)

在这里插入图片描述

11.3 maven常用插件介绍和本地Flink项目打包 * 添加打包插件

<build>
        <finalName>xdclass-flink</finalName>
    <plugins><!--默认编译版本比较低,所以用compiler插件,指定项目源码的jdk版本,编译后的jdk版本和编码,-->
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.6.1</version>
            <configuration>
                <source>${java.version}</source>
                <target>${java.version}</target>
                <encoding>${file.encoding}</encoding>
            </configuration>
        </plugin><!-- 添加依赖到jar包 -->
        <!--<plugin>-->
            <!--<artifactId>maven-assembly-plugin</artifactId>-->
            <!--<configuration>-->
                <!--<descriptorRefs>-->
                    <!--<descriptorRef>jar-with-dependencies</descriptorRef>-->
                <!--</descriptorRefs>-->
            <!--</configuration>-->
            <!--<executions>-->
                <!--<execution>-->
                    <!--<id>make-assembly</id>-->
                    <!--<phase>package</phase>-->
                    <!--<goals>-->
                        <!--<goal>single</goal>-->
                    <!--</goals>-->
                <!--</execution>-->
            <!--</executions>-->
        <!--</plugin>--><plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>2.3</version>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>
  • 打包插件
    • maven-jar-plugin,默认的打包插件,用来打普通的jar包,需建立lib目录里来存放需要的依赖包
    • maven-shade-plugin (推荐) 将依赖的jar包打包到当前jar包,成为fat JAR包,也可以防止类冲突 隔离
    • maven-assembly-plugin,大数据项目用的比较多,支持自定义的打包结构,比如sql/shell等
  • 测试插件
    • maven-surefire-plugin, 用于mvn 生命周期的测试阶段的插件,通过参数设置在junit下控制测试

运行

通过WebUI部署Flink项目到阿里云Linux运行

* 访问WebUI

* 上传jar包

* 选择main入口类APP
* 提交任务查看情况
* Task Solt 是指taskmanager的并发执行能力,parallelism是指taskmanager实际使用的并发能力

taskmanager.numberOfTaskSlots:4
​
假如每一个taskmanager中的分配4TaskSlot,
那有3个taskmanager一共有12TaskSlot
  • 测试数据
AA,2022-11-11 12:01:01,-1
BB,2022-11-11 12:01:02,1
AA,2022-11-11 12:01:04,-1
AA,2022-11-11 12:01:05,-1
  • 并行度和solt的疑惑
    • Task Slots 是具备的并发能力,大于 Parallelism并行度(实际用的)
    • 数据流里面算子的最大并行度就是Parallelism, 2-2-2-3-1 这样的并行度,最大就是3(同个任务job里面)

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-JsWSazxh-1678068577521)(image/image_bbmzvy5EfP.png)]

十二、docker 安装

  • * 每个 Flink 集群的作业里,都是有客户端在运行,主要是获取 Flink 应用程序的代码,将其转换为 JobGraph 并提交给 JobManager
    • JobManager 将工作分配到 TaskManagers 上,在那里运行实际的操作符(例如源、转换和接收器)
    • 客户端是啥?
      • 我们前面测试的 bin/flink run xxx 这个就是客户端的一种,提交任务给flink集群运行
      • 或者WebUI界面那边提交任务

  • Local 本地部署,直接启动进程,适合调试使用
    • 直接部署启动服务
  • Standalone Cluster集群部署,flink自带集群模式
  • Hadoop YARN 计算资源统一由Hadoop YARN管理资源进行调度,按需使用提高集群的资源利用率
    • Yarn集群三种模式介绍,
      • Session模式
        • JM 负载瓶颈,main 方法在客户端执行
        • 保留了Standalone的优势,需要事先申请资源,启动固定数量的JobManager和TaskManger (JobManager只有一个)
        • 常驻在内存,提交到这个集群的作业可以直接运行
        • Session的资源总量有限,多个job之间不是隔离的,故可能会造成资源的争用或者宕机影响
        • 适合场景:对延迟非常敏感但运行时长较短的作业
      • Per-Job模式(Docker-Compose不支持)
        • JM 负载瓶颈,main 方法在客户端执行
        • 各自形成单独的Flink集群,拥有专属的JobManager和TaskManager
        • 一个作业的TaskManager失败不会影响其他作业的运行, 作业完成后相关资源会被清除
        • 当机器上有多个 client 时,有较高的网络负载:传输 jar 、消耗大量的 CPU 来执行 main方法
        • 适合场景:规模大长时间运行的作业
      • Application模式(Flink 1.11版本中)
        • 和Per-Job模式类似
        • 主要是为了解决 Per-Job Mode 的不足,避免 带宽、CPU 的热点问题
  • 注意:
    • Docker 上的 Flink 不支持Per-Job 模式。
  • HA模式
    • standalone cluster HA
    • YARN cluster HA
    • 需要JDK、zookeeper HA、flink 等程序进行构建,至少需要三个物理机。
    • K8S
    • 云厂商:阿里云、华为云、亚马逊云等
  • 官方文档
  • 机器和配置准备
    • 关闭local模式部署的flink进程
    • 安装docker和docker-compose
  • 创建docker-compose.yml 文件 _Session Cluster_模式
version: "3.7"
services:
  jobmanager:
    image: flink:scala_2.12-java8
    ports:
      - "8081:8081"
    command: jobmanager
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanagertaskmanager:
    image: flink:scala_2.12-java8
    depends_on:
      - jobmanager
    command: taskmanager
    scale: 3
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager
        taskmanager.numberOfTaskSlots: 2

在这里插入图片描述

每个 manage 有 2 个 slot 所以最大并行度为 3(个manage)* 2(个slot) =6

version: "3.7"
services:
  flink-jobmanager-01:
    image: flink:scala_2.12-java8
    container_name: flink-jobmanager-01
    hostname: flink-jobmanager-01
    expose:
      - "6123"
    ports:
      - "8081:8081"
    command: jobmanager
    environment:
      - JOB_MANAGER_RPC_ADDRESS=flink-jobmanager-01
 
  flink-taskmanager-01:
    image: flink:scala_2.12-java8
    container_name: flink-taskmanager-01
    hostname: flink-taskmanager-01
    expose:
      - "6121"
      - "6122"
    depends_on:
      - flink-jobmanager-01
    command: taskmanager
    links:
      - "flink-jobmanager-01:jobmanager"
    environment:
      - JOB_MANAGER_RPC_ADDRESS=flink-jobmanager-01
  
  flink-taskmanager-02:
    image: flink:scala_2.12-java8
    container_name: flink-taskmanager-02
    hostname: flink-taskmanager-02
    expose:
      - "6121"
      - "6122"
    depends_on:
      - flink-jobmanager-01
    command: taskmanager
    links:
      - "flink-jobmanager-01:jobmanager"
    environment:
      - JOB_MANAGER_RPC_ADDRESS=flink-jobmanager-01

端口说明

The Web Client is on port 8081
JobManager RPC port 6123
TaskManagers RPC port 6122
TaskManagers Data port 6121
​
注意:
  expose暴露容器给link到当前容器的容器
  ports是暴露容器端口到宿主机端口进行映
  • 问题
    • 内存不足 (其他程序不运行,最少也需要1核2g,建议是4或者8g)
    • 网络安全组没开放端口 8081

十三、总结

  • 课程总结
    • Source
    • Transformation
    • Sink
  • 特性
    • Window
    • Watermark
    • 窗口函数
    • CEP
    • State和Checkpoint
  • Flink进阶
    • Flink内存管理和优化、Blink、SQL、Table API 、容错、HA、新特性
    • 多流连接Join、触发器、定时器、通信组件Akka/RPC原理、JobGraph流程
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

flink大数据处理流式计算详解 的相关文章

  • 在 Linux 上使用多处理时,TKinter 窗口不会出现

    我想生成另一个进程来异步显示错误消息 同时应用程序的其余部分继续 我正在使用multiprocessingPython 2 6 中的模块来创建进程 我试图用以下命令显示窗口TKinter 这段代码在Windows上运行良好 但在Linux上
  • 将 jar 作为 Linux 服务运行 - init.d 脚本在启动应用程序时卡住

    我目前正在致力于在 Linux VM 上实现一个可运行的 jar 作为后台服务 我已经使用了找到的例子here https gist github com shirish4you 5089019作为工作的基础 并将 start 方法修改为
  • 我的线程图像生成应用程序如何将其数据传输到 GUI?

    Mandelbrot 生成器的缓慢多精度实现 线程化 使用 POSIX 线程 Gtk 图形用户界面 我有点失落了 这是我第一次尝试编写线程程序 我实际上并没有尝试转换它的单线程版本 只是尝试实现基本框架 到目前为止它是如何工作的简要描述 M
  • 如何通过ssh检查ubuntu服务器上是否存在php和apache

    如何通过ssh检查Ubuntu服务器上apache是 否安装了php和mysql 另外如果安装的话在哪个目录 如果安装了其他软件包 例如 lighttpd 那么它在哪里 确定程序是否已安装的另一种方法是使用which命令 它将显示您正在搜索
  • 如何确保应用程序在 Linux 上持续运行

    我试图确保脚本在开发服务器上保持运行 它会整理统计数据并提供网络服务 因此它应该会持续存在 但一天中有几次 它会因未知原因而消失 当我们注意到时 我们只需再次启动它 但这很麻烦 并且某些用户没有权限 或专有技术 来启动它 作为一名程序员 我
  • 内核模式下的线程(和进程)与用户模式下的线程(和进程)有什么区别?

    我的问题 1 书中现代操作系统 它说线程和进程可以处于内核模式或用户模式 但没有明确说明它们之间有什么区别 2 为什么内核态线程和进程的切换比用户态线程和进程的切换花费更多 3 现在 我正在学习Linux 我想知道如何在LINUX系统中分别
  • waitpid() 的作用是什么?

    有什么用waitpid 它通常用于等待特定进程完成 或者如果您使用特殊标志则更改状态 基于其进程 ID 也称为pid 它还可用于等待一组子进程中的任何一个 无论是来自特定进程组的子进程还是当前进程的任何子进程 See here http l
  • Linux:如何设置进程的时区?

    我需要设置在 Linux 机器上启动的各个进程的时区 我尝试设置TZ变量 在本地上下文中 但它不起作用 有没有一种方法可以使用与系统日期不同的系统日期从命令行运行应用程序 这可能听起来很愚蠢 但我需要一种sandbox系统日期将被更改的地方
  • 我如何知道 C 程序的可执行文件是在前台还是后台运行?

    在我的 C 程序中 我想知道我的可执行文件是否像这样在前台运行 a out 或者像这样 a out 如果你是前台工作 getpgrp tcgetpgrp STDOUT FILENO or STDIN FILENO or STDERR FIL
  • 子目录中的头文件(例如 gtk/gtk.h 与 gtk-2.0/gtk/gtk.h)

    我正在尝试使用 GTK 构建一个 hello world 其中包括以下行 include
  • 确定我可以向文件句柄写入多少内容;将数据从一个 FH 复制到另一个 FH

    如何确定是否可以将给定数量的字节写入文件句柄 实际上是套接字 或者 如何 取消读取 我从其他文件句柄读取的数据 我想要类似的东西 n how much can I write w handle n read r handle buf n a
  • 快速像素绘图库

    我的应用程序以每像素的方式生成 动画 因此我需要有效地绘制它们 我尝试过不同的策略 库 但结果并不令人满意 尤其是在更高分辨率的情况下 这是我尝试过的 SDL 好的 但是慢 OpenGL 像素操作效率低下 xlib 更好 但仍然太慢 svg
  • 高效的内存屏障

    我有一个多线程应用程序 其中每个线程都有一个整数类型的变量 这些变量在程序执行期间递增 在代码中的某些点 线程将其计数变量与其他线程的计数变量进行比较 现在 我们知道在多核上运行的线程可能会无序执行 一个线程可能无法读取其他线程的预期计数器
  • 如何查询X11显示分辨率?

    这似乎是一个简单的问题 但我找不到答案 如何查询 通过 X11 存在哪些监视器及其分辨率 查看显示宏 http tronche com gui x xlib display display macros html and 屏幕宏 http
  • 为什么我可以在 /proc/pid/maps 输出中看到几个相同的段?

    测试在32位Linux上进行 代码如下 int foo int a int b int c a b return c int main int e 0 int d foo 1 2 printf d n d scanf d e return
  • Mcrt1.o和Scrt1.o有什么用?

    我坚持使用以下两个文件 即 Mcrt1 o 和 Scrt1 o 谁能帮我知道这两个文件的用途 如何使用它 我们以 gcrt1 o 为例 在使用 pg 选项编译进行性能测试时非常有用 谢谢 表格的文件 crt o总是 C 运行时启动代码 大部
  • Linux 上的基准测试程序

    对于一项任务 我们需要使用不同的优化和参数来对我们的实现进行基准测试 有没有一种可行的方法可以在Linux命令行 我知道时间 上使用不同的参数对小程序进行基准测试 从而为我提供CSV或类似内容的时间数据 输出可能类似于 Implementa
  • 当用户按下打印时运行脚本,并且在脚本结束之前不开始假脱机(linux,cups)

    我需要做的是结合用户按下打印来执行 python 程序 脚本 并且在该程序退出之前不要让打印作业假脱机 原因是打印驱动程序不是开源的 我需要更改用户设置 在本例中是部门 ID 和密码 通常是每个用户 但因为这是一个信息亭 具有相同帐户的不同
  • 限制 Imagemagick 使用的空间和内存

    我在 Rails 应用程序上使用 Imagemagick 使用 rmagick 但我的服务器 Ubuntu 不是很大 当我启动转换进程时 Imagemagick 占据了我的服务器 30GB HDD 的所有位置 内存 我想限制内存和 tmp
  • Linux 上的 RTLD_LOCAL 和dynamic_cast

    我们有一个由应用程序中的一些共享库构成的插件 我们需要在应用程序运行时更新它 出于性能原因 我们在卸载旧插件之前加载并开始使用新插件 并且只有当所有线程都使用旧插件完成后 我们才卸载它 由于新插件和旧插件的库具有相同的符号 我们dlopen

随机推荐

  • 单选框互斥且可同时取消选中

    单选框互斥且可同时取消选中 div class b div
  • 你值得拥有——流星雨下的告白(Python实现)

    目录 1 前言 2 霍金说移民外太空 3 浪漫的流星雨展示 4 Python代码 1 前言 我们先给个小故事 提一下大家兴趣 然后我给出论据 得出结论 最后再浪漫的流星雨表白代码奉上 还有我自创的一首诗 开始啦 2 霍金说移民外太空 霍金说
  • 最新版FreeRTOS的移植------STM32F103c8t6

    系列文章目录 用FlyMcu和USB转TTL给stm32中烧录程序 stm32C8 C6 文章目录 系列文章目录 前言 一 先决条件 二 使用步骤 1 获取FreeRTOS源码 2 将freeRTOS相关文件移植进keil工程 3 修改相关
  • 如何在ubuntu上安装gcc

    首先查一下 有没有gcc 如下 然后准备安装gcc 1 sudo是授权 apt是一个应用管理工具 apt是本地存了一份软件包信息的列表 包括依赖 大小 vesion等 目的是为了在安装软件的时候快速检测依赖 并自动安装相关依赖 但在安装之前
  • 数据库原理及应用(MySQL版)MySQL实验指导参考答案(实验一到实验八)

    点赞 收藏 慢慢看 lt 一 gt 实验一 CREATE DATABASE STUDENTSDB USE STUDENTSDB CREATE TABLE STUDENT INFO 学号 CHAR 4 NOT NULL PRIMARY KEY
  • 百度文库免费复制word文档的纯文字

    2022年5月11日测试过 以下方法能正常使用 1 在页面中安F12或者从浏览器的设置中找到开发人员工具 2 切换到控制台 然后点击右上角图标进入更多设置 3 在设置 首选项中 找到 调试程序 然后勾选 禁用javascript 4 做完上
  • 微信支付的收款功能被限制了怎么办,收款受限制怎么解除?

    使用小程序做电商 商城的微信支付的收款功能会遇到被限制的情况 直接影响用户下单后的付款操作 其实也不单单是小程序 商城APP中也会冒出类似的提示 遇到这种事情不要慌 根据具体的异常提示给出不同的解决方案 微信支付被限制的错误提示 我们列举两
  • 浅析数据库连接池(二)

    上一篇博客 主要是简单的介绍了普通数据库连接的过程以及耗费的资源 并且简单的谈了下连接池 这篇我们主要来看看数据库连接池的使用以及它最优的配置 总目录 1 数据库连接过程是怎样的 2 连接所占用的资源有哪些 3 连接池简介 4 连接池的使用
  • 用python画星空源代码

    from turtle import from random import random randint screen Screen width height 800 600 screen setup width height screen
  • 每天都在谈SOA和微服务,但你真的理解什么是服务吗?

    近几年来 我一直从事着和面向服务相关的底层软件研发工作 逐渐的形成了一些自己的看法 其中我觉得比较重要的看法就是服务需要一个更准确细致的定义 简单来说 服务的本质就是行为 业务活动 的抽象 为了更好的阐述新服务的概念 并方便与传统的SOA中
  • 【c语言】Hanoi塔问题

    一块板上有三根针 A B C A 针上套有 64 个大小不等的圆盘 大的在下 小的在上 如图 5 4 所示 要把这 64 个圆盘从 A 针移动 C 针上 每次只能移动一个圆盘 移动可以借 助 B 针进行 但在任何时候 任何针上的圆盘都必须保
  • 本周总结——勇敢尝试和体验

    人间烟火 生活趣事 快开学了 这一周都在写项目 键盘前一段时间坏掉了 当时买了保险 3年之内只换不修的 挺奇葩的 寄过去13天都没搭理我 也没说给换货 前几天忍不住打电话问了问 下午就发货了 昨天下午就领到了 看来有些东西还是需要主动问一问
  • 搞懂后序遍历!只需要这一篇

    讲讲对于后序遍历的理解 并通过题目加深理解 文章目录 核心 基础实现方式 104 二叉树的最大深度 111 二叉树的最小深度 222 完全二叉树的节点个数 110 平衡二叉树 101 对称二叉树 总结 核心 后序遍历的顺序为左右中 在一棵二
  • 在Ubuntu上安装Android-SDK的方法

    一 安装和配置Ubuntu系统 1 安装Ubuntu Desktop 14 04 x86 64 2 启用root账户 Ubuntu 14 04默认是不允许root账户登录的 在登录窗口只能看到普通用户和访客登录 在shell中运行以下命令即
  • 优化游标性能

    最好的改进光标性能的技术就是 能避免时就避免使用游标 摘自 Transact SQL权威指南 Ken Henderson 著 最好的改进光标性能的技术就是 能避免时就避免使用游标 SQL Server是关系数据库 其处理数据集比处理单行好得
  • ROS学习笔记(7):Navigation 导航

    目录 8 Navigation 8 1 Navigation工作框架 8 2 move base 8 3 Costmap 8 4 map server 8 5 AMCL 定位 8 Navigation Navigation是机器人最基本的功
  • 小程序显示富文本内容(wxparse)

    1 引入wxParse 下载地址https github com icindy wxParse 2 全局配置 3 获取富文本内容的js 加入如下内容
  • 在电力系统无功不足的情况下,为什么不宜采用调整变压器分头的办法来提高电压?

    在电力系统无功不足的情况下 为什么不宜采用调整变压器分头的办法来提高电压 答 当某一地区的电压由于变压器分头的改变而升高的时候 该地区所需的无功功率也增大了 这就可能扩大系统的无功缺额 从而导致整个系统的电压水平更加下降 从全局来看 这样做
  • Redis VS Memcached压力测试报告

    一 测试背景与目标 了解Redis和memcached在高并发条件下的响应时间 吞吐量情况 以及对于服务器的压力情况 包括CPU IO 网络 考察目前的memcached存储timeline的方式的在高并发条件下的响应时间 吞吐量 负载情况
  • flink大数据处理流式计算详解

    flink大数据处理 文章目录 flink大数据处理 二 WebUI可视化界面 测试用 三 Flink部署 3 1 JobManager 3 2 TaskManager 3 3 并行度的调整配置 3 4 区分 TaskSolt和parall