sparkstreamming 消费kafka(2)

2023-11-19

spark streaming提供了两种获取方式,一种是同storm一样,实时读取缓存到内存中;另一种是定时批量读取。

这两种方式分别是:

  • Receiver-base

  • Direct

 

一 、Receiver-base:

Spark官方最先提供了基于Receiver的Kafka数据消费模式。不过这种方式是先把数据从kafka中读取出来,然后缓存在内存,再定时处理。如果这时候集群退出,而偏移量又没处理好的话,数据就丢掉了,存在程序失败丢失数据的可能,后在Spark 1.2时引入一个配置参数spark.streaming.receiver.writeAheadLog.enable以规避此风险。

Receiver-based的Kafka读取方式是基于Kafka高阶(high-level) api来实现对Kafka数据的消费。在提交Spark Streaming任务后,Spark集群会划出指定的Receivers来专门、持续不断、异步读取Kafka的数据,读取时间间隔以及每次读取offsets范围可以由参数来配置。读取的数据保存在Receiver中,具体StorageLevel方式由用户指定,诸如MEMORY_ONLY等。当driver 触发batch任务的时候,Receivers中的数据会转移到剩余的Executors中去执行。在执行完之后,Receivers会相应更新ZooKeeper的offsets。如要确保at least once的读取方式,可以设置spark.streaming.receiver.writeAheadLog.enable为true。具体Receiver执行流程见下图:

spark streaming启动过后,会选择一台excetor作为ReceiverSupervior

1:Reciver的父级ReciverTracker分发多个job(task)到不同的executor,并启动ReciverSupervisor.

2:ReceiverSupervior会启动对应的实例reciver(kafkareciver,TwitterReceiver),并调用onstart()

3:kafkareciver在通过onstart()启动后就开启线程源源不断的接收数据,并交给ReceiverSupervior,通过ReceiverSupervior.store函数一条一条接收

4:ReceiverSupervior会调用BlockGenertor.adddata填充数据。

 

所有的中间数据都缓存在BlockGenertor

1:首先BlockGenertor维护了一个缓冲区,currentbuffer,一个无限长度的arraybuffer。为了防止内存撑爆,这个currentbuffer的大小可以被限制,通过设置参数spark.streaming.reciver.maxRate,以秒为单位。currentbuffer所使用的内存不是storage(负责spark计算过程中的所有存储,包括磁盘和内存),而是珍贵的计算内存。所以currentbuffer应该被限制,防止占用过多计算内存,拖慢任务计算效率,甚至有可能拖垮Executor甚至集群。

2:维护blockforpushing队列,它是等待被拉到到BlockManager的中转站。它是currentbuffer和BlockManager的中间环节。它里面的每一个元素其实就是一个currentbuffer。

3:维护两个定时器,其实就是一个生产-消费模式。blockintervaltimer定时器,负责生产端,定时将currentbuffer放进blockforpushing队列。blockforpushingthread负责消费端,定时将blockforpushing里的数据转移到BlockManager。

Kafka的high-level数据读取方式让用户可以专注于所读数据,而不用关注或维护consumer的offsets,这减少用户的工作量以及代码量而且相对比较简单。因此,在刚开始引入Spark Streaming计算引擎时,我们优先考虑采用此种方式来读取数据,具体的代码如下:

如上述代码,函数getKafkaInputStream提供了zookeeper, topic, groupId, numReceivers, partition以及ssc,其传入函数分别对应:

  • zookeeper: ZooKeeper连接信息

  • topic: Kafka中输入的topic信息

  • groupId: consumer信息

  • numReceivers: 打算开启的receiver个数, 并用来调整并发

  • partition: Kafka中对应topic的分区数

以上几个参数主要用来连接Kafka并读取Kafka数据。具体执行的步骤如下:

  • Kafka相关读取参数配置,其中 zookeeper.connect即传入进来的zookeeper参数;auto.offset.reset设置从topic的最新处开始读取数据;zookeeper.connection.timeout.ms指zookeepr连接超时时间,以防止网络不稳定的情况;fetch.message.max.bytes则是指单次读取数据的大小;group.id则是指定consumer。

  • 指定topic的并发数,当指定receivers个数之后,但是如果配置当receivers个数小于topic的partition个数时,在每个receiver上面会起相应的线程来读取不同的partition。(感觉这里是相当于一个分区分配一个线程来消费,但是实际上kafka消费者一个消费者可以消费多个分区数据,只是一个分区数据不能被多个消费者消费而已。)

  • 读取Kafka数据,numReceivers的参数在此用于指定我们需要多少Executor来作为Receivers,开多个Receivers是为了提高应用吞吐量。

  • union用于将多个Receiver读取的数据关联起来。

二、Direct:

这种方式是延迟的。也就是说当action真正触发时才会去kafka里接数据。因此不存在currentbuffer的概念。它把kafka每个分区里的数据,映射为KafkaRdd的概念。题外话,在structured streaming中,也已经向DataFrame和DataSet统一了,弱化了RDD的概念。

真正与kafka打交道的是KafkaCluster,全限定名: org.apache.spark.streaming.kafka.KafkaCluster。包括设备kafka各种参数,连接,获取分区,以及偏移量,设置偏移量范围等。

Direct方式采用Kafka简单的consumer api方式来读取数据,无需经由ZooKeeper,此种方式不再需要专门Receiver来持续不断读取数据。当batch任务触发时,由Executor读取数据,并参与到其他Executor的数据计算过程中去。driver来决定读取多少offsets,并将offsets交由checkpoints来维护。将触发下次batch任务,再由Executor读取Kafka数据并计算。从此过程我们可以发现Direct方式无需Receiver读取数据,而是需要计算时再读取数据,所以Direct方式的数据消费对内存的要求不高,只需要考虑批量计算所需要的内存即可;另外batch任务堆积时,也不会影响数据堆积。其具体读取方式如下图:

源码:
 

Spark Streaming提供了一些重载读取Kafka数据的方法,本文中关注两个基于Scala的方法,这在我们的应用场景中会用到,具体的方法代码如下:

  • 方法createDirectStream中,ssc是StreamingContext;kafkaParams的具体配置见Receiver-based之中的配置,与之一样;这里面需要指出的是fromOffsets ,其用来指定从什么offset处开始读取数据。

方法createDirectStream中,该方法只需要3个参数,其中kafkaParams还是一样,并未有什么变化,不过其中有个配置auto.offset.reset可以用来指定是从largest或者是smallest处开始读取数据;topic是指Kafka中的topic,可以指定多个。具体提供的方法代码如下

在实际的应用场景中,我们会将两种方法结合起来使用,大体的方向分为两个方面:

  • 应用启动。当程序开发并上线,还未消费Kafka数据,此时从largest处读取数据,采用第二种方法;

  • 应用重启。因资源、网络等其他原因导致程序失败重启时,需要保证从上次的offsets处开始读取数据,此时就需要采用第一种方法来保证我们的场景

总体方向上,我们采用以上方法满足我们的需要,当然具体的策略我们不在本篇中讨论,后续会有专门的文章来介绍。从largest或者是smallest处读Kafka数据代码实现如下:

程序失败重启的逻辑代码如下:

代码中的fromOffsets参数从外部存储获取并需要处理转换,其代码如下:

该方法提供了从指定offsets处读取Kafka数据。如果发现读取数据异常,我们认为是offsets失败,此种情况去捕获这个异常,然后从largest处读取Kafka数据。

 

Receive_base VS   Direct两种方式的优缺点:

Direct方式具有以下方面的优势:

1、简化并行(Simplified Parallelism)。不现需要创建以及union多输入源,Kafka topic的partition与RDD的partition一一对应

2、高效(Efficiency)。Receiver-based保证数据零丢失(zero-data loss)需要配置spark.streaming.receiver.writeAheadLog.enable,此种方式需要保存两份数据,浪费存储空间也影响效率。而Direct方式则不存在这个问题。

3、强一致语义(Exactly-once semantics)。High-level数据由Spark Streaming消费,但是Offsets则是由Zookeeper保存。通过参数配置,可以实现at-least once消费,此种情况有重复消费数据的可能。

4、降低资源。Direct不需要Receivers,其申请的Executors全部参与到计算任务中;而Receiver-based则需要专门的Receivers来读取Kafka数据且不参与计算。因此相同的资源申请,Direct 能够支持更大的业务。

5、降低内存。Receiver-based的Receiver与其他Exectuor是异步的,并持续不断接收数据,对于小业务量的场景还好,如果遇到大业务量时,需要提高Receiver的内存,但是参与计算的Executor并无需那么多的内存。而Direct 因为没有Receiver,而是在计算时读取数据,然后直接计算,所以对内存的要求很低。实际应用中我们可以把原先的10G降至现在的2-4G左右。

6、鲁棒性更好。Receiver-based方法需要Receivers来异步持续不断的读取数据,因此遇到网络、存储负载等因素,导致实时任务出现堆积,但Receivers却还在持续读取数据,此种情况很容易导致计算崩溃。Direct 则没有这种顾虑,其Driver在触发batch 计算任务时,才会读取数据并计算。队列出现堆积并不会引起程序的失败。

 

Direct方式的缺点:

  • 提高成本。Direct需要用户采用checkpoint或者第三方存储来维护offsets,而不像Receiver-based那样,通过ZooKeeper来维护Offsets,此提高了用户的开发成本。

  • 监控可视化。Receiver-based方式指定topic指定consumer的消费情况均能通过ZooKeeper来监控,而Direct则没有这种便利,如果做到监控并可视化,则需要投入人力开发。

 

Receive-base优点:

1、Kafka的high-level数据读取方式让用户可以专注于所读数据,而不用关注或维护consumer的offsets,这减少用户的工作量以及代码量而且相对比较简单。

 

Receive-base的缺点:

1、防数据丢失。做checkpoint操作以及配置spark.streaming.receiver.writeAheadLog.enable参数,配置spark.streaming.receiver.writeAheadLog.enable参数,每次处理之前需要将该batch内的日志备份到checkpoint目录中,这降低了数据处理效率,反过来又加重了Receiver端的压力;另外由于数据备份机制,会受到负载影响,负载一高就会出现延迟的风险,导致应用崩溃。

2、单Receiver内存。由于receiver也是属于Executor的一部分,那么为了提高吞吐量,提高Receiver的内存。但是在每次batch计算中,参与计算的batch并不会使用到这么多的内存,导致资源严重浪费。

3、在程序失败恢复时,有可能出现数据部分落地,但是程序失败,未更新offsets的情况,这导致数据重复消费

4、提高并行度,采用多个Receiver来保存Kafka的数据。Receiver读取数据是异步的,并不参与计算。如果开较高的并行度来平衡吞吐量很不划算。5、Receiver和计算的Executor的异步的,那么遇到网络等因素原因,导致计算出现延迟,计算队列一直在增加,而Receiver则在一直接收数据,这非常容易导致程序崩溃。

6、采用MEMORY_AND_DISK_SER降低对内存的要求。但是在一定程度上影响计算的速度

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

sparkstreamming 消费kafka(2) 的相关文章

随机推荐

  • Vue引入elementUI组件

    Element Ul是饿了么前端团队推出的一款基于Vue js 2 0 的桌面端UI框架 一套为开发者 设计师和产品经理准备的基于 Vue 2 0 的桌面端组件库 手机端有对应框架是Mint UI 中文文档 http element cn
  • 基于Java实现的DES加密算法

    1 总结DES原理 DES算法为密码体制中的对称密码体制 又被称为美国数据加密标准 是1972年美国IBM公司研制的对称密码体制加密算法 明文按64位进行分组 密钥长64位 密钥事实上是56位参与DES运算 第8 16 24 32 40 4
  • fiddler APP抓包设置

    IOS设置 http t istester com 3000 istester 21Day src master Fiddler V1 0 21Day Fiddler 13 md 安卓设置 http t istester com 3000
  • Spring容器和应用上下文理解

    有了Spring之后 通过依赖注入的方式 我们的业务代码不用自己管理关联对象的生命周期 业务代码只需要按照业务本身的流程 走啊走啊 走到哪里 需要另外的对象来协助了 就给Spring说 我想要个对象 于是Spring就很贴心的给你个对象 听
  • 一文学会Canal怎么用

    文章目录 一 概念 1 什么是Canal 2 Canal的基本原理 二 Mysql配置 1 安装 2 开启mysql的binlog 3 mysql创建cannl用户并授权 三 安装配置ES kibana 四 安装canal server 五
  • jsp page 提示[page] is not properly terminated

    今天在编写jsp 页面 引入其他的jsp 的方式如下
  • 批量将xls转换成xlsx

    转载 https blog csdn net weixin 44674885 article details 88669259 1 xls和xlsx区别 xls格式 最大行数为65535 xlsx格式 最大行数为1048576 2 xls批
  • 海康、大华IpCamera RTSP地址和格式

    大家注意 我下面文章描述的都是海康老款摄像机的RTSP规则 现在新的DS 系列 摄像机型号为DS 开头的 的摄像机RTSP规则为 http blog csdn net xiejiashu article details 71786187 海
  • CentOS7编译内核

    下面记录了我在CentOS7上编译新内核的过程 背景 实验室的一台服务器上装且仅装了CentOS7 内核版本为3 10 0 327 el7 x86 64 我要在当前系统上 编译 安装内核4 1 16 搭建编译环境 sudo yum inst
  • 区块链学习笔记(六)——区块链的分类

    文章目录 一 强调 二 公有链 联盟链 私有链 1 公有链 2 联盟链 3 私有链 总结 一 强调 先做一下重复强调 区块链技术是集分布式存储 点对点传输 共识机制 加密算法 数据区块等概念于一体的新兴技术集合 二 公有链 联盟链 私有链
  • 基于ETest的航电系统通用测试平台

    随着电子技术的发展 航电系统在飞机整机中的重要性飞速提升 据统计 近年来航电系统在飞机出厂成本中的比例直线上升 航电系统研发成本已占飞机研制总成本的近30 并保持着持续扩大的趋势 测试保障作为航电产业链至关重要的一环 贯穿航空电子设备 研发
  • SpringBoot读取Resource下文件的四种方式

    SpringBoot读取Resource下文件的四种方式 1 ClassPathResource classPathResource new ClassPathResource static image a jpg InputStream
  • connect函数的用法

    作者 曾宏安 华清远见嵌入式学院讲师 在网络编程中 connect函数经常用来在套接字上初始化连接 无论是流式套接字还是数据报套接字都可以使用connect函数 但含义却不一样 下面我们分别来讨论一下 一 流式套接字 流式套接字通常使用的是
  • CSS字体样式属性(字体设置)

    font size 字号大小 font size属性用来设置字符 该属性的值有两种单位 1 相对长度单位 像素单位 px 2 绝对长度单位 使用非常少 font family 字体 font family属性用于设置字体 网页常用的字体 宋
  • 学习太极创客 — ESP8226 (十三)OTA

    视频链接 https www bilibili com video BV1L7411c7jw p 23 vd source b91967c499b23106586d7aa35af46413 资料链接 http www taichi make
  • bazel构建使用clang工具链

    最近使用clang工具构建bazel项目 官方文档给的step较为繁琐 这里暂时记录一下 以便后期可以直接去用 这里具体的规则不进行详细解释 具体看官方文档有关描述 查看预定义变量列表 使用如下命令 bazel info show make
  • 【华为OD统一考试A卷

    在线OJ 已购买本专栏用户 请私信博主开通账号 在线刷题 运行出现 Runtime Error 0Aborted 请忽略 华为OD统一考试A卷 B卷 新题库说明 2023年5月份 华为官方已经将的 2022 0223Q 1 2 3 4 统一
  • module ‘tensorflow‘ has no attribute ‘global_variables_initializer‘(问题已解决)

    最近在学深度学习 一开始就遇到了个很狗血的问题 总会报出例如下面的这种错误 y hat tf constant 36 name y hat y tf constant 39 name y loss tf Variable y y hat 2
  • xpath通过text()方式获取div节点下的文本存在bug

    环境 scrapy1 8 python3 7 3 div块如下 div class li b l span class money 12k 20k span 经验3 5年 大专 div 用形如 x response xpath div cl
  • sparkstreamming 消费kafka(2)

    spark streaming提供了两种获取方式 一种是同storm一样 实时读取缓存到内存中 另一种是定时批量读取 这两种方式分别是 Receiver base Direct 一 Receiver base Spark官方最先提供了基于R