Spark Streaming流式数据处理

2023-10-29

一、Spark Streaming 简介

Spark Streaming是核心Spark API的扩展,可实现实时数据流的可伸缩,高吞吐量,容错流处理。数据可以从Kafka, Kinesis, or TCP sockets等许多来源摄入,并且可以使用与像高级别功能表达复杂的算法来处理map,reduce,join和window。最后,可以将处理后的数据推送到文件系统,数据库和实时仪表板。还可以在数据流上应用Spark的MLlib 机器学习和 GraphX 图形处理算法。

在这里插入图片描述

        在内部,它的工作方式如下。Spark Streaming接收实时输入数据流,并将数据分成批处理,然后由Spark引擎进行处理,以生成批处理的最终结果流。

在这里插入图片描述

        Spark Streaming提供了称为离散化流DStream的高级抽象,它表示连续的数据流。可以根据来自Kafka和Kinesis等来源的输入数据流来创建DStream,也可以通过对其他DStream应用高级操作来创建DStream。在内部,DStream表示为RDD序列 。


二、简单的例子

  • 新建一份maven工程,导入依赖:
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.11</artifactId>
      <version>${spark_version}</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.11</artifactId>
      <version>${spark_version}</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming -->
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.11</artifactId>
      <version>${spark_version}</version>
    </dependency>
  • 基于Spark Streaming的wordcount
package cn.wsj.mysparkstreaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object NcWordCount {
  def main(args: Array[String]): Unit = {
  
    //创建SparkConf对象
    val conf: SparkConf = new SparkConf()
      .setAppName(this.getClass.getName)
      .setMaster("local[4]")
	//创建StreamingContext对象与集群进行交互,Seconds(5)表示批处理间隔
    val ssc: StreamingContext = new StreamingContext(conf, Seconds(5))
    
  	//创建一个将连接到主机名xxx端口xxxx的DStream,例如localhost:9999
    val line: ReceiverInputDStream[String] = ssc.socketTextStream("192.168.237.160", 1234)
	//计算每批数据中的每个单词并打印
     line.flatMap(_.split(" "))
      .map((_, 1))
      .reduceByKey(_+_).print()
	//启动Spark Streaming
    ssc.start()
    //等待计算终止
    ssc.awaitTermination()
  }
}

  • 这里监听了一个外部的端口,可以再linux虚拟机上进行如下操作:
# 安装网络工具netcat
ymu -y install nc
# 打开一个socket ,这里我开启的是1234端口
nc -lk 1234

在spark程序启动后,可以观察到时间戳的变化,此时来到socket端口,不断回车输入字母

e.g.

在这里插入图片描述

在spark客户端可以看到如下变化,每段时间都会显示每个单词的词频统计结果:

"C:\Program Files\Java\jdk1.8.0_231\bin\java.exe"
-------------------------------------------
Time: 1617191345000 ms
-------------------------------------------

-------------------------------------------
Time: 1617191350000 ms
-------------------------------------------
(spark,1)
(java,1)

-------------------------------------------
Time: 1617191355000 ms
-------------------------------------------
(java,2)

-------------------------------------------
Time: 1617191360000 ms
-------------------------------------------
(scala,2)
(java,1)


Process finished with exit code -1


三、Spark Streaming相关核心类

3.1 StreamingContext

        要初始化Spark Streaming程序,必须创建一个StreamingContext对象,该对象是所有Spark Streaming功能的主要入口点。

  • 通过SparkConf创建
import org.apache.spark._
import org.apache.spark.streaming._

val conf = new SparkConf().setAppName(this.getClass.getName).setMaster(master)
val ssc = new StreamingContext(conf, Seconds(1))
  • 通过SparkContext创建,通常是使用已有的 SparkContext 来创建
    StreamingContext
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(1))

定义context后,必须执行以下操作:

1.通过创建输入DStream定义输入源;
2.通过将转换和输出操作应用于DStream来定义流计算;
3.开始接收数据并使用进行处理streamingContext.start();
4.等待使用停止处理(手动或由于任何错误)streamingContext.awaitTermination();
5.可以使用手动停止处理streamingContext.stop()。


创建 StreamingContext需要注意下面几个问题:

1.一个 JVM 只能有一个 SparkContext 启动。意味着应用程序中不应该出现
两个 SparkContext;
2.一个 JVM 同时只能有一个 StreamingContext 启动。但一个 SparkContext可以创建多个 StreamingContext,只要上一个 StreamingContext 先用 stop(false)停止,再创建下一个即可;默认调用stop()方法时,会同时停止内部的SparkContext
3.StreamingContext 停止后不能再启动。也就是说调用 stop()后不能再 start();
4…StreamingContext 启动之后,就不能再往其中添加任何计算逻辑了。也就是说执行 start()方法之后,不能再使 DStream 执行任何算子.


3.2 离散流 Discretized Streams(DStreams)

        离散流或DStream是Spark Streaming提供的基本抽象。它表示连续的数据流,可以是从源接收的输入数据流,也可以是通过对输入流进行转换而生成的已处理数据流。在内部,DStream由一系列连续的RDD表示,这是Spark对不变的分布式数据集的抽象。DStream中的每个RDD都包含来自特定间隔(batch interval)的数据,如下图所示。

在这里插入图片描述

        在DStream上执行的任何操作都转换为对基础RDD的操作。例如,第二点举例中在将行流转换为单词中,该flatMap操作应用于linesDStream中的每个RDD,以生成DStream的 wordsRDD。如下图所示:

在这里插入图片描述

这些底层的 RDD 转换是由 Spark 引擎计算的。DStream 操作隐藏了这些细节中的大部分,并为开发人员提供了更高级的 API。详见后文 DStream API。


3.3 Input DStreams 与 Receivers(接收器)

        输入DStream是表示从流源接收的输入数据流的DStream。在示例中,line输入DStream代表从netcat服务器接收的数据流。每个输入DStream都与一个Receiver对象 (Scala doc, Java doc)关联,该对象从源接收数据并将其存储在Spark的内存中以进行处理。

Spark Streaming提供了两类内置的流媒体源:

  • 基础源:StreamingContext API 中直接可用的资源。示例:文件系统和 Socket连接;
  • 高级源:像 Kafka、Flume、Kinesis 等资源可以通过额外的工具类获得。这些需要额外依赖项;

        可以在流处理程序中并行的接收多个数据流,即创建多个 Input DStreams。这将创建同时接收多个数据流的多个 receivers(接收器)。但需要注意,一个Spark 的 worker/executor 是一个长期运行的任务(task),因此它将占用分配给Spark Streaming 的应用程序的所有核中的一个核(core);
        因此,一个Spark Streaming 应用需要分配足够的核(core)(或线程(threads),如果本地运行的话)来处理所接收的数据,以及来运行接收器(receiver(s))。


需要注意的点:

在本地运行 Spark Streaming 程序时,不要使用“local”或“local[1]”作为主URL。这两种方法都意味着只有一个线程将用于在本地运行任务。如果使用基于接收器的输入 DStream(例如 sockets、Kafka、Flume 等),那么将使用单个线程来运行接收器。因此,在本地运行时,始终使用"local[n]"作为主 URL,其中运行 n 个接收方;
在集群上运行时,分配给 Spark Streaming 应用程序的内核数量必须大于接收器的数量。否则,系统将接收数据,但无法处理它


3.3.1 基础数据源

a.Socket(TCP Socket)

        示例使用的即是是 Socket 数据源;

  • textFileStream() 参 数 必 须 是 文 件 目 录 , 但 可 以 支 持 通 配 符 如"hdfs://namenode:8020/logs/2017/*";
  • Spark 将监视该目录任务新建的文件,一旦有新文件才会处理;
  • 所有文件要求有相同的数据格式;
  • 监视文件的修改时间而不是创建时间,注意更新文件内容不会被监视,一旦开始处理,这些文件必须不能再更改,因此如果文件被连续地追加,新的数据也不会被读取;
  • 处理后,在当前窗口中对文件的更改不会导致重新读取该文件。即: updates are ignored;

b.File Streams(文件流)

e.g.

import org.apache.spark.streaming._

val ssc =new StreamingContext(sc,Seconds(8))
ssc.textFileStream("/data/sparkstreaming/test/").print
ssc.start

c.RDDs 队列

        通常用于测试中。为了使用测试数据测试 Spark Streaming 应用程序,可以使用 streamingContext.queueStream(queueOfRDDs) 创建一个基于 RDDs 队列的DStream,每个进入队列的 RDD 都将被视为 DStream 中的一个批次数据,并且就像一个流进行处理。


3.3.2 高级数据源

a.Flume 数据源

Spark Streaming 集成Flume


b.Kafka 数据源

Spark Streaming 集成 Kafka


3.3.3 Streams based on Custom Receivers

        可以使用通过自定义接收器接收的数据流来创建DStream。


四、DStream API*

DStream API


五、Spark Streaming优化

Spark Streaming 优化


PS:如果有写错或者写的不好的地方,欢迎各位大佬在评论区留下宝贵的意见或者建议,敬上!如果这篇博客对您有帮助,希望您可以顺手帮我点个赞!不胜感谢!

原创作者:wsjslient

作者主页:https://blog.csdn.net/wsjslient


本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark Streaming流式数据处理 的相关文章

  • 在 Spark mapPartitions 中使用 Java 8 并行流

    我试图了解 Spark 并行性中 Java 8 并行流的行为 当我运行下面的代码时 我期望输出大小为listOfThings与输入大小相同 但事实并非如此 我的输出中有时会缺少一些项目 这种行为并不一致 如果我只是遍历迭代器而不是使用par
  • 在 Spark 2.3.0 的结构化流中禁用 _spark_metadata

    我的结构化流应用程序正在写入镶木地板 我想摆脱它创建的 spark metadata 文件夹 我使用了下面的属性 看起来不错 conf spark hadoop parquet enable summary metadata false 当
  • 2 个具有相同消费者组 ID 的 Spark Stream 作业

    我正在尝试对消费者群体进行实验 这是我的代码片段 public final class App private static final int INTERVAL 5000 public static void main String ar
  • Spark Streaming 2.0.0 - 在负载下几天后冻结

    我们在带有 Spark 2 0 0 的 AWS EMR 5 0 0 上运行 从 125 个分片 Kinesis 流中使用 使用 2 个消息生成器提供 19k 个事件 秒 每条消息大小约为 1k 使用 20 台机器组成的集群进行消费 该代码有
  • Apache Zeppelin 0.6.1:运行 Spark 2.0 Twitter Stream 应用程序

    我有一个安装了 Spark 2 0 和 Zeppelin 0 6 1 的集群 自从上课以来TwitterUtils scala从 Spark 项目移至 Apache Bahir 我无法再在我的 Zeppelin 笔记本中使用 Twitter
  • Spark Streaming 应用程序失败并出现 KafkaException:字符串超出最大大小或出现 IllegalArgumentException

    TL DR 我非常简单的 Spark Streaming 应用程序在驱动程序中失败 并显示 KafkaException 字符串超出最大大小 我在执行程序中看到了相同的异常 但我还在执行程序日志的某处发现了 IllegalArgumentE
  • Spark Scala 流式 CSV

    我是 Spark Scala 的新手 我知道如何加载 CSV 文件 sqlContext read format csv 以及如何读取文本流和文件流 scc textFileStream file c path filename scc f
  • Spark Streaming:读取Kafka Stream并将其作为RDD提供以供进一步处理

    我目前有以下设置 应用程序将数据写入 Kafka gt SparkStreaming 读取存储的数据 始终从最早的条目读取 并转换为流 gt 应用程序需要此结果的 RDD 来训练 mllib 模型 我想基本上实现类似的目标https git
  • 如何优化 Apache Spark 应用程序中的 shuffle 溢出

    我正在运行一个有 2 个工作人员的 Spark 流应用程序 应用程序具有连接和并集操作 所有批次均已成功完成 但注意到 shuffle 溢出指标与输入数据大小或输出数据大小不一致 溢出内存超过 20 倍 Please find the sp
  • 从 kafka-Spark-Streaming 读取数据时获取空集

    大家好 我是 Spark Streaming 的新手 我正在尝试读取 xml 文件并将其发送到 kafka 主题 这是我的 Kafka 代码 它将数据发送到 Kafka console consumer Code package org a
  • Spark 流自定义指标

    我正在开发一个 Spark Streaming 程序 它检索 Kafka 流 对流进行非常基本的转换 然后将数据插入到数据库 如果相关 则为 voltdb 我正在尝试测量向数据库插入行的速率 我认为metrics http metrics
  • 可以在 Spark 批处理上创建模型并在 Spark 流中使用它吗?

    我可以在 Spark Ba tch 中创建模型并将其用于 Spark Streaming 进行实时处理吗 我在 Apache Spark 网站上看到了各种示例 其中训练和预测都是基于相同类型的处理 线性回归 构建的 我可以在 Spark B
  • 关于访问 Tuple2 内的字段时发生错误

    我正在尝试访问 Tuple2 中的字段 但编译器返回错误 该软件尝试在 kafka 主题中推送一个案例类 然后我想使用 Spark Streaming 恢复它 这样我就可以提供机器学习算法并将结果保存在 mongo 实例中 Solved 我
  • Spark流式批量查找数据

    我需要从 HDFS 上的文件查找 Spark 流作业中的一些数据 该数据由批处理作业每天获取一次 有没有 设计模式 为了这样的任务 如何在执行后立即重新加载内存中的数据 哈希图 每日更新 当查找数据时 如何连续服务流作业被抓取 一种可能的方
  • 可以触发流选择特定文件

    我的程序连续读取流hadoop文件夹 比如 hadoopPath 它从上面的文件夹中选取所有文件 我可以只显示该文件夹的特定文件类型吗 例如 hadoopPath log 我还有一个与 Spark 和流媒体相关的问题 Spark Strea
  • Spark清理shuffle溢出到磁盘

    我有一个循环操作 它生成一些 RDD 进行重新分区 然后进行聚合键操作 循环运行一次后 它会计算出最终的 RDD 该 RDD 会被缓存和检查点 并用作下一次循环的初始 RDD 这些 RDD 非常大 并且在每次迭代到达最终 RDD 之前都会生
  • 如何从迭代器创建 Spark RDD?

    为了说清楚 我不是从像这样的数组 列表中寻找RDD List
  • Spark 结构化流 - 从嵌套目录读取文件

    我有一个客户端将 CSV 文件放置在嵌套目录中 如下所示 我需要实时读取这些文件 我正在尝试使用 Spark 结构化流来做到这一点 Data user data 1 csv user data 2 csv user data 3 csv u
  • 执行器失败后 Spark 无法在 HDFS 中找到检查点数据

    我从 Kafka 传输数据如下 final JavaPairDStream
  • Jupyter Notebook 上未显示结构化流输出

    我有两个笔记本 第一个笔记本正在使用 tweepy 从 twitter 读取推文并将其写入套接字 其他笔记本正在使用 Spark 结构化流 Python 从该套接字读取推文并将其结果写入控制台 不幸的是我没有在 jupyter 控制台上得到

随机推荐

  • Springboot中创建拦截器

    目录 目的 实现过程 1 创建拦截器 2 注册拦截器 完整代码 目的 在Springboot项目中创建拦截器 在进入Controller层之前拦截请求 可对拦截到的请求内容做响应处理 如 校验请求参数 验证证书等操作 实现过程 1 创建拦截
  • PTS测试

    PTS性能测试 一 什么是性能测试 性能测试PTS Performance Testing Service 是一款简单易用 具备强大的分布式压测能力的SaaS压测平台 PTS可以模拟复杂的业务场景 并快速精准地调度不同规模的流量 同时提供压
  • JAVA中String及String常用的方法

    String string是表示字符串的字符串类 public class StringDemo public static void main String args 常见面试题 String s new String hello 问 如
  • Assuming drive cache: write through

    我也遇到过 关机重启就可以了
  • linux内存管理(十四)-内存OOM触发分析

    在内存分配路径上 当内存不足的时候会触发kswapd 或者内存规整 极端情况会触发OOM 来获取更多内存 在内存回收失败之后 会进行OOM OOM的入口是 alloc pages may oom 文件位于mm page alloc c中 s
  • 线代——基础解系 vs 特征向量

    基础解系 基础解系的概念是针对方程而言的 齐次线性方程组的解集的最大无关组称为齐次线性方程的基础解系 要求齐次线性方程组的通解 只需求出它的基础解系 例 特征向量 特征向量和特征值满足关系式 A A
  • 物理学家的Python

    特点 从基本的交互式 Python 开始 熟悉该语言 详细讨论了所有程序清单 介绍了 Matplotlib 图形 用于生成表示数据和函数图的图形 例如场线 考虑了动画函数图 有一章专门讨论代数方程和超越方程的数值解 讨论了基本的数学原理 并
  • 之前的一些笔记 py

    cmd清空 cls win e 打开我的电脑 好习惯 文件夹后面写数字表明软件的版本 怎么将2个分开的网页窗口合并成一个 鼠标按住一个窗口中的标签页不放 并向其它窗口的标题栏中拖动 即可把当前标签页合并到其它窗口中 注意 一定要按住标签页拖
  • 学习OpenCV——Surf(特征点篇)&flann

    Surf Speed Up Robust Feature Surf算法的原理 1 构建Hessian矩阵构造高斯金字塔尺度空间 其实surf构造的金字塔图像与sift有很大不同 就是因为这些不同才加快了其检测的速度 Sift采用的是DOG图
  • 我的博客即将同步至腾讯云+社区

    我的博客即将同步至腾讯云 社区 邀请大家一同入驻
  • OpenGL笔记之矩阵变换(Matrix Transformation)

    OpenGL笔记之矩阵变换 Matrix Transformation 分类 OpenGL 2012 08 06 21 21 3968人阅读 评论 1 收藏 举报 transformation matrix math 图形 影视 本文是学习
  • nftqin网站登录参数s算法逆向

    原文转载自http www itfvck com 7488 html 抓包数据 method POST authority api nftqin com scheme https path api app passwordLogin con
  • java 代码静态检查_[原创]Java静态代码检查工具介绍

    原创 Java静态代码检查工具介绍 一 什么是静态代码检查 静态代码分析是指无需运行被测代码 仅通过分析或检查源程序的语法 结构 过程 接口等来检查程序的正确性 找出代码隐藏的错误和缺陷 如参数不匹配 有歧义的嵌套语句 错误的递归 非法计算
  • 把win10系统迁移至ssd后,开机时电脑默认不会启动ssd里面系统的问题解决方法

    把win10系统迁移至ssd后 开机时电脑默认不会启动ssd里面系统的问题解决方法 电脑用了两三年了卡到不行 开机要两三分钟 刚开机后两三分钟内卡到都无法操作 于是乎 买了一块三星的860 evo的ssd 固态硬盘 打算对旧机升级一下 简单
  • elasticsearch之嵌套对象、父子文档

    一 嵌套对象 es并非关系型数据库 它并不擅长关系型数据库的join操作 在存储数据时 用冗余数据替代查询时的关联 如blog和comments 如果用关系型数据库 会将blog存一个表 comments存另一个表 然后将这两个表关联起来
  • 华兴数控g71外圆循环编程_数控车床加工时的复合循环指令G70,G71,G72,G73

    复合循环指令应用在切除非一次加工即能加工到规定尺寸的场合 主要在粗车和多次切螺纹的情况下使用 它主要有以下几种 外径 内径粗车循环指令G71 该指令将工件切削到精加工之前的尺寸 精加工前工件形状及粗加工的刀具路径由系统根据精加工尺寸自动设定
  • DataWorks数据埋点的设计及未来发展的思考

    什么是前端埋点 马总曾经说过现在是DT时代 大数据的时代 数据已经成为一家公司最宝贵的财富 越来越多的互联网公司开始重视数据的应用 数据应用的过程是 数据收集 gt 数据整理 数据同步 gt 数据分析 gt 数据可视化 前端埋点是用户行为数
  • ChatGPT的主要应用场景例子

    ChatGPT是一种基于深度学习技术的大型语言模型 它可以根据用户提供的输入信息 生成自然语言文本或响应 这种技术可以应用于很多领域 下面将详细介绍ChatGPT在以下几个方面的应用 以下是使用过程中的一些应用场景对话记录 欢迎补充更多的应
  • 什么是ioc

    什么叫ioc 1 ioc叫做控制反转 是面向对象的一种设计方式 2 把对象的创建和对象之间的调用过程 交给spring管理 3 目的 为了使耦合度降低 耦合度 我有多个service类 都需要调用一个dao类 当我修改这个dao类的位置时
  • Spark Streaming流式数据处理

    目录 一 Spark Streaming 简介 二 简单的例子 三 Spark Streaming相关核心类 3 1 StreamingContext 3 2 离散流 Discretized Streams DStreams 3 3 Inp