Spark Streaming(组件、updateStateByKey、Windows)总结

2023-11-01

1. SparkStreaming 是什么

  • 它是一个可扩展高吞吐具有容错性流式计算框架

    吞吐量:单位时间内成功传输数据的数量

  • 之前我们接触的spark-core和spark-sql都是处理属于离线批处理任务,数据一般都是在固定位置上,通常我们写好一个脚本,每天定时去处理数据,计算,保存数据结果。这类任务通常是T+1(一天一个任务),对实时性要求不高。

在这里插入图片描述

  • 但在企业中存在很多实时性处理的需求,例如:双十一的京东阿里,通常会做一个实时的数据大屏,显示实时订单。这种情况下,对数据实时性要求较高,仅仅能够容忍到延迟1分钟或几秒钟。
    在这里插入图片描述

2. 实时计算框架对比

  • Storm

    • 流式计算框架,来一条处理一条

    • 以record为单位处理数据,支持micro-batch方式(Trident)

    • 对python不友好

  • flink

    • 流式计算框架,来一条处理一条
    • 比spark streaing速度快
  • Spark

    • 批处理计算框架,间隔一段时间,获取一次数据
    • 以RDD为单位处理数据,支持micro-batch流式处理数据(Spark Streaming
    • 实时性稍差,但是能处理的数据量更大
    • pyspark
  • 对比:

    • 吞吐量:Spark Streaming优于Storm
    • 延迟:Spark Streaming差于Storm

3. Spark Streaming组件

  • Streaming Context
    • 流上下文 通过Streaming Context 可以连接数据源获取数据
    • 通过spark context 可以获取到streaming context
    • 在创建Streaming Context 需要指定一个时间间隔(micro batch)
    • Streaming Context调用了stop方法之后,就不能再次调 start(),需要重新创建一个Streaming Context
    • 一个SparkContext创建一个Streaming Context
    • streaming Context上调用Stop方法,默认会把spark context也关掉
    • 如果只想仅关闭Streaming Context对象,设置stop()的可选参数为false
    • 对DStream中数据处理的逻辑要写在Streaming Context开启之前 一旦Streaming Context调用了start方法 就不能再添加新的数据处理逻辑
  • DStream(离散流)
    • Streaming Context 连接到不同的数据源获取到的数据 抽象成DStream模型
    • 代表一个连续的数据流
    • 一系列连续的RDD组成
    • 任何对DStreams的操作都转换成了对DStreams隐含的RDD的操作
    • 数据源
      • 基本源
        • TCP/IP Socket
        • FileSystem
      • 高级源
        • Kafka
        • Flume

4. Spark Streaming 编码实战(无状态)

4.1 Spark Streaming编码步骤:

  1. 创建一个StreamingContext
  2. 从StreamingContext中创建一个数据对象
  3. 对数据对象进行Transformations操作
  4. 输出结果
  5. 开始和停止

4.2 利用Spark Streaming实现WordCount

需求:监听某个端口上的网络数据,实时统计出现的不同单词个数。

1,需要安装一个nc工具:sudo yum install -y nc

2,执行指令:nc -lk 9999 -v

import os
# 配置spark driver和pyspark运行时,所使用的python解释器路径
PYSPARK_PYTHON = "/home/hadoop/miniconda3/envs/datapy365spark23/bin/python"
JAVA_HOME='/home/hadoop/app/jdk1.8.0_191'
SPARK_HOME = "/home/hadoop/app/spark-2.3.0-bin-2.6.0-cdh5.7.0"

os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON
os.environ['JAVA_HOME']=JAVA_HOME
os.environ["SPARK_HOME"] = SPARK_HOME

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

if __name__ == "__main__":
    sc = SparkContext("local[2]",appName="NetworkWordCount")
    #参数2:指定执行计算的时间间隔
    ssc = StreamingContext(sc, 1)
    #监听ip,端口上的上的数据
    lines = ssc.socketTextStream('localhost',9999)
    #将数据按空格进行拆分为多个单词
    words = lines.flatMap(lambda line:line.split(' '))
    #将单词转换为(单词,1)的形式
    pairs = words.map(lambda word:(word,1))
    #统计单词个数
    wordCounts = pairs.reduceByKey(lambda x,y:x+y)
    #打印结果信息,会使得前面的transformation操作执行 类似于action
    wordCounts.pprint()
    #启动StreamingContext
    ssc.start()
    #等待计算结束 这里在jupyter notebook交互式环境中才需要加
    ssc.awaitTermination()

可视化查看效果: 主机地址:4040 点击streaming,查看效果

5. Spark Streaming的状态操作

  • Spark Streaming实现的是一个实时批处理操作,每隔一段时间将数据进行打包,封装成RDD,是无状态的。

    • 无状态:指的是每个时间片段的数据之间是没有关联的。
  • 需求:想要将一个大时间段(1天),即多个小时间段的数据内的数据持续进行累积操作,一般超过一天都是用RDD或Spark SQL来进行离线批处理

  • 在Spark Streaming中存在两种状态操作

    • UpdateStateByKey
    • Windows操作
  • 使用有状态的transformation,需要开启Checkpoint

    • spark streaming 的容错机制
    • 它将足够多的信息checkpoint到某些具备容错性的存储系统如hdfs上,以便出错时能够迅速恢复

5.1 updateStateByKey

步骤

  • 首先,要定义一个state,可以是任意的数据类型
  • 其次,要定义state更新函数–指定一个函数如何使用之前的state和新值来更新state
  • 对于每个batch,Spark都会为每个之前已经存在的key去应用一次state更新函数,无论这个key在batch中是否有新的数据。如果state更新函数返回none,那么key对应的state就会被删除
  • 对于每个新出现的key,也会执行state更新函数

5.2 案例:updateStateByKey

需求:监听网络端口的数据,获取到每个批次的出现的单词数量,并且需要把每个批次的信息保留下来

代码

import os
# 配置spark driver和pyspark运行时,所使用的python解释器路径
PYSPARK_PYTHON = "/home/hadoop/miniconda3/envs/datapy365spark23/bin/python"
JAVA_HOME='/home/hadoop/app/jdk1.8.0_191'
SPARK_HOME = "/home/hadoop/app/spark-2.3.0-bin-2.6.0-cdh5.7.0"
os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON
os.environ['JAVA_HOME']=JAVA_HOME
os.environ["SPARK_HOME"] = SPARK_HOME
from pyspark.streaming import StreamingContext
from pyspark.sql.session import SparkSession

# 创建SparkContext
spark = SparkSession.builder.master("local[2]").getOrCreate()
sc = spark.sparkContext
ssc = StreamingContext(sc, 3)
#开启检查点
ssc.checkpoint("checkpoint")

#定义state更新函数
def updateFunc(new_values, last_sum):
    return sum(new_values) + (last_sum or 0)

lines = ssc.socketTextStream("localhost", 9999)
# 对数据以空格进行拆分,分为多个单词
counts = lines.flatMap(lambda line: line.split(" ")) \
    .map(lambda word: (word, 1)) \
    .updateStateByKey(updateFunc=updateFunc)#应用updateStateByKey函数
    
counts.pprint()

ssc.start()
ssc.awaitTermination()

5.3 Windows

  • 窗口长度L:运算的数据量
  • 滑动间隔G:控制每隔多长时间做一次运算

每隔G秒,统计最近L秒的数据

在这里插入图片描述

操作细节

  • Window操作是基于窗口长度和滑动间隔来工作的
  • 窗口的长度控制考虑前几批次数据量
  • 默认为批处理的滑动间隔来确定计算结果的频率
  • 在这里插入图片描述

典型案例:热点搜索词滑动统计,每隔10秒,统计最近60秒钟的搜索词的搜索频次,并打印出最靠前的3个搜索词出现次数。

在这里插入图片描述

5.4 案例 windows

监听网络端口的数据,每隔3秒统计前6秒出现的单词数量

import os
# 配置spark driver和pyspark运行时,所使用的python解释器路径
PYSPARK_PYTHON = "/home/hadoop/miniconda3/envs/datapy365spark23/bin/python"
JAVA_HOME='/home/hadoop/app/jdk1.8.0_191'
SPARK_HOME = "/home/hadoop/app/spark-2.3.0-bin-2.6.0-cdh5.7.0"
# 当存在多个版本时,不指定很可能会导致出错
os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON
os.environ['JAVA_HOME']=JAVA_HOME
os.environ["SPARK_HOME"] = SPARK_HOME
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql.session import SparkSession

def get_countryname(line):
    country_name = line.strip()
    if country_name == 'usa':
        output = 'USA'
    elif country_name == 'ind':
        output = 'India'
    elif country_name == 'aus':
        output = 'Australia'
    else:
        output = 'Unknown'

    return (output, 1)

if __name__ == "__main__":
	#定义处理的时间间隔
    batch_interval = 10 # base time unit (in seconds)
    #定义窗口长度
    window_length = 6 * batch_interval
    #定义滑动时间间隔
    frequency = 1 * batch_interval

    #获取StreamingContext
    spark = SparkSession.builder.master("local[2]").getOrCreate()
		sc = spark.sparkContext
		ssc = StreamingContext(sc, batch_interval)
    
    #需要设置检查点
    ssc.checkpoint("checkpoint")

    lines = ssc.socketTextStream('localhost', 9999)
    addFunc = lambda x, y: x + y
    invAddFunc = lambda x, y: x - y
    #调用reduceByKeyAndWindow,来进行窗口函数的调用
    window_counts = lines.map(get_countryname) \
        .reduceByKeyAndWindow(addFunc, invAddFunc, window_length, frequency)
	#输出处理结果信息
    window_counts.pprint()

    ssc.start()
    ssc.awaitTermination()
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark Streaming(组件、updateStateByKey、Windows)总结 的相关文章

  • 为什么从 Pandas 1.0 中删除了日期时间?

    我在 pandas 中处理大量数据分析并每天使用 pandas datetime 最近我收到警告 FutureWarning pandas datetime 类已弃用 并将在未来版本中从 pandas 中删除 改为从 datetime 模块
  • 如何用python脚本控制TP LINK路由器

    我想知道是否有一个工具可以让我连接到路由器并关闭它 然后从 python 脚本重新启动它 我知道如果我写 import os os system ssh l root 192 168 2 1 我可以通过 python 连接到我的路由器 但是
  • 安装了 32 位的 Python,显示为 64 位

    我需要运行 32 位版本的 Python 我认为这就是我在我的机器上运行的 因为这是我下载的安装程序 当我重新运行安装程序时 它会将当前安装的 Python 版本称为 Python 3 5 32 位 然而当我跑步时platform arch
  • Python getstatusoutput 替换不返回完整输出

    我发现了这个很棒的替代品getstatusoutput Python 2 中的函数在 Unix 和 Windows 上同样有效 不过我觉得这个方法有问题output被构建 它只返回输出的最后一行 但我不明白为什么 任何帮助都是极好的 def
  • 使用 Python 从文本中删除非英语单词

    我正在 python 上进行数据清理练习 我正在清理的文本包含我想删除的意大利语单词 我一直在网上搜索是否可以使用像 nltk 这样的工具包在 Python 上执行此操作 例如给出一些文本 Io andiamo to the beach w
  • 使用字典映射数据帧索引

    为什么不df index map dict 工作就像df column name map dict 这是尝试使用index map的一个小例子 import pandas as pd df pd DataFrame one A 10 B 2
  • 您可以格式化 pandas 整数以进行显示,例如浮点数的“pd.options.display.float_format”?

    我见过this https stackoverflow com questions 18404946 py pandas formatdataframe and this https stackoverflow com questions
  • Python beautifulsoup 仅限 1 级文本

    我看过其他 beautifulsoup 得到相同级别类型的问题 看来我的有点不同 这是网站 我正试图拿到右边那张桌子 请注意表的第一行如何展开为该数据的详细细分 我不想要那个数据 我只想要最顶层的数据 您还可以看到其他行也可以展开 但在本例
  • 如何使用python在一个文件中写入多行

    如果我知道要写多少行 我就知道如何将多行写入一个文件 但是 当我想写多行时 问题就出现了 但是 我不知道它们会是多少 我正在开发一个应用程序 它从网站上抓取并将结果的链接存储在文本文件中 但是 我们不知道它会回复多少行 我的代码现在如下 r
  • Docker 中的 Python 日志记录

    我正在 Ubuntu Web 服务器上的 Docker 容器中测试运行 python 脚本 我正在尝试查找由 Python Logger 模块生成的日志文件 下面是我的Python脚本 import time import logging
  • 加快网络抓取速度

    我正在使用一个非常简单的网络抓取工具抓取 23770 个网页scrapy 我对 scrapy 甚至 python 都很陌生 但设法编写了一个可以完成这项工作的蜘蛛 然而 它确实很慢 爬行 23770 个页面大约需要 28 小时 我看过scr
  • Jupyter Notebook 找不到 Python 模块

    不知道发生了什么 但每当我使用 ipython 氢 原子 或 jupyter 笔记本时都找不到任何已安装的模块 我知道我安装了 pandas 但笔记本说找不到 我应该补充一点 当我正常运行脚本时 python script py 它确实导入
  • 从 NumPy ndarray 中选择行

    我只想从 a 中选择某些行NumPy http en wikipedia org wiki NumPy基于第二列中的值的数组 例如 此测试数组的第二列包含从 1 到 10 的整数 gt gt gt test numpy array nump
  • 如何断言 Unittest 上的可迭代对象不为空?

    向服务提交查询后 我会收到一本字典或一个列表 我想确保它不为空 我使用Python 2 7 我很惊讶没有任何assertEmpty方法为unittest TestCase类实例 现有的替代方案看起来并不正确 self assertTrue
  • Pandas 将多行列数据帧转换为单行多列数据帧

    我的数据框如下 code df Car measurements Before After amb temp 30 268212 26 627491 engine temp 41 812730 39 254255 engine eff 15
  • 为什么 Pickle 协议 4 中的 Pickle 文件是协议 3 中的两倍,而速度却没有任何提升?

    我正在测试 Python 3 4 我注意到 pickle 模块有一个新协议 因此 我对 2 个协议进行了基准测试 def test1 pickle3 open pickle3 wb for i in range 1000000 pickle
  • Python:XML 内所有标签名称中的字符串替换(将连字符替换为下划线)

    我有一个格式不太好的 XML 标签名称内有连字符 我想用下划线替换它 以便能够与 lxml objectify 一起使用 我想替换所有标签名称 包括嵌套的子标签 示例 XML
  • 在本地网络上运行 Bokeh 服务器

    我有一个简单的 Bokeh 应用程序 名为app py如下 contents of app py from bokeh client import push session from bokeh embed import server do
  • Pandas 每周计算重复值

    我有一个Dataframe包含按周分组的日期和 ID df date id 2022 02 07 1 3 5 4 2022 02 14 2 1 3 2022 02 21 9 10 1 2022 05 16 我想计算每周有多少 id 与上周重
  • 使用 z = f(x, y) 形式的 B 样条方法来拟合 z = f(x)

    作为一个潜在的解决方案这个问题 https stackoverflow com questions 76476327 how to avoid creating many binary switching variables in gekk

随机推荐

  • Python3.0 基础系列教程(目录)

    准备写一篇python的系列教程 目录暂定如下 如果有更好的建议 麻烦下方留言 如无意外 大约一周2 3篇 敬请期待 环境安装篇 1 下载并安装Python3 0 2 第一个python程序 3 安装集成开发环境ide 基础知识篇 基本数据
  • go:chan分为阻塞和非阻塞

    一句话总结 ch make chan int 由于没有缓冲发送和接收需要同步 ch make chan int 2 有缓冲不要求发送和接收操作同步 1 无缓冲时 发送阻塞直到数据被接收 接收阻塞直到读到数据 package main imp
  • 【华为OD机试真题】单核CPU任务调度

    单核CPU任务调度 考察的知识的点就一个优先队列 队列排序 题目描述 现在有一个CPU和一些任务需要处理 已提前获知每个任务的任务D 优先级 所需执行时间和到达时间 CPU同时只能运行一个任务 请编写一个任务调度程序 采用 可抢占优先权调度
  • Scala 的安装教程

    Scala 语言可以运行在Window Linux Unix Mac OS X等系统上 Scala是基于java之上 大量使用java的类库和变量 使用 Scala 之前必须先安装 Java gt 1 5版本 Mac OS X 和 Linu
  • git 提交新的工程

    git cmd exe 环境 windows git 提交新的工程 查看版本号 E software Git gt git version git version 2 15 1 windows 2 添加用户配置 E software Git
  • Error: EBUSY: resource busy or locked, lstat ‘D:\DumpStack.log.tmp‘

    问题 vue项目启动成功后报错 Error EBUSY resource busy or locked lstat D DumpStack log tmp 解决 1 npm cache clean force 2 npm install
  • 关于Apache/Tomcat/JBOSS/Nginx/lighttpd/Jetty等一些常见服务器的区别比较和理解

    今天是个很丰富的日子 早上一上班 第一个听到的惊爆消息就是楷子得了肠胃炎 一大早去医院挂水了 随后风胜和笑虎也没来 后来得知他们俩去去华星现代产业园参加培训 内容是关于Apache与Nginx的 于是乎 我非常感兴趣地查了一下培训用的PPT
  • C#按钮事件中有循环,用另一个按钮控制停止,暂停,继续程序执行

    首先在窗体上有 lable1 运行显示 button1 开始 button2 暂停和继续 button3 停止 窗体上还放Timer控件timer1 代码实现如下 using System using System Collections
  • vue+elementUI table表格嵌套表单,功能包含联动下拉框、动态增加行

    一 需求说明 vue elementUI table表格里嵌套表单 支持动态增加一行和删除一行 含checkbox复选框 联动下拉框 不同的活动名称 所对应的活动选项下拉框的值不同 针对不同的选项 值的表现形式也要发生对应的变化 如 日期形
  • An attempt was made to call a method that does not exist. The attempt was made from the following lo

    APPLICATION FAILED TO START Description An attempt was made to call a method that does not exist The attempt was made fr
  • springboot学习(三)——使用HttpMessageConverter进行http序列化和反序列化

    以下内容 如有问题 烦请指出 谢谢 对象的序列化 反序列化大家应该都比较熟悉 序列化就是将object转化为可以传输的二进制 反序列化就是将二进制转化为程序内部的对象 序列化 反序列化主要体现在程序I O这个过程中 包括网络I O和磁盘I
  • 链塔智库

    链塔智库整理最近一周内区块链相关政策 业内动态 人物观点 为大家梳理呈现各个领域的最新发展 目录 一 各地政策要闻 四川 探索建立基于区块链技术的数字资产交易平台 首个区块链领域国家标准在成都举行首场征求意见会 重庆出台优化工业园区规划建设
  • 云计算概念详解

    1 云计算的定义 1 云计算是一种能够通过网络以便利的按需的方式获取云计算资源 网络 服务器 存储 应用和服务 的模式 2 这些资源来自一个共享的 可配置的资源池 并能够快速获取和释放 提供资源的网络称为云 3 云模式能够提高可用性 4 云
  • IDEA下载与安装,保姆级教程

    这里写自定义目录标题 1 搜索idea 2 选择官方网站 3 官网进入下载页面 4 版本选择问题 5 Ultimate和Community对比 6 下载 7 安装 1 搜索idea 2 选择官方网站 以前idea的官网后面有官网俩字 现在没
  • Open NMT-py 玩具模型使用说明

    前排提示 本文仅适合纯萌新玩家 算是官方指南的补档 大佬请直接关闭网页 避免浪费时间 截至到2023 3 15 最新的OpenNMT py环境要求 Python gt 3 7 PyTorch gt 1 9 0 如果是老版本的OpenNMT
  • https网络编程——使用openssl库自建根证书

    参考 如何自建根证书 使用openssl库自建根证书带图详解 地址 https qingmu blog csdn net article details 108217572 spm 1001 2014 3001 5502 目录 根证书的普通
  • spring boot最新教程(三):Spring Boot整合JdbcTemplate以及事务管理

    一 JdbcTemplate的使用 Spring对数据库的操作在jdbc上面做了深层次的封装 使用spring的注入功能 可以把DataSource注册到JdbcTemplate之中 JdbcTemplate 是在JDBC API基础上提供
  • python:日期时间处理

    目录 一 time模块 二 秒转换为时分秒 三 计算前后几天的日期 一 time模块 1 time strftime format t 格式 说明 a 本地 locale 简化星期名称 A 本地完整星期名称 b 本地简化月份名称 B 本地完
  • 【STM32Cube】学习笔记(六):DHT11温湿度传感器

    文章目录 摘要 一 简介 1 DHT11数字温湿度传感器 2 DHT11性能参数 2 DHT11数据结构 2 DHT11传输时序 二 硬件电路设计 1 模块内部电路 2 与单片机相连接电路 三 软件设计 1 CubeMX配置 2 CubeI
  • Spark Streaming(组件、updateStateByKey、Windows)总结

    Spark Streaming 1 SparkStreaming 是什么 2 实时计算框架对比 3 Spark Streaming组件 4 Spark Streaming 编码实战 无状态 4 1 Spark Streaming编码步骤 4