1. SparkStreaming 是什么
- 但在企业中存在很多实时性处理的需求,例如:双十一的京东阿里,通常会做一个实时的数据大屏,显示实时订单。这种情况下,对数据实时性要求较高,仅仅能够容忍到延迟1分钟或几秒钟。
2. 实时计算框架对比
-
Storm
-
flink
- 流式计算框架,来一条处理一条
- 比spark streaing速度快
-
Spark
- 批处理计算框架,间隔一段时间,获取一次数据
- 以RDD为单位处理数据,支持micro-batch流式处理数据(Spark Streaming
- 实时性稍差,但是能处理的数据量更大
- pyspark
-
对比:
- 吞吐量:Spark Streaming优于Storm
- 延迟:Spark Streaming差于Storm
3. Spark Streaming组件
- Streaming Context
- 流上下文 通过Streaming Context 可以连接数据源获取数据
- 通过spark context 可以获取到streaming context
- 在创建Streaming Context 需要指定一个时间间隔(micro batch)
- Streaming Context调用了stop方法之后,就不能再次调 start(),需要重新创建一个Streaming Context
- 一个SparkContext创建一个Streaming Context
-
streaming Context上调用Stop方法,默认会把spark context也关掉
- 如果只想仅关闭Streaming Context对象,设置stop()的可选参数为false
- 对DStream中数据处理的逻辑要写在Streaming Context开启之前 一旦Streaming Context调用了start方法 就不能再添加新的数据处理逻辑
- DStream(离散流)
- Streaming Context 连接到不同的数据源获取到的数据 抽象成DStream模型
- 代表一个连续的数据流
- 由一系列连续的RDD组成
- 任何对DStreams的操作都转换成了对DStreams隐含的RDD的操作
- 数据源
4. Spark Streaming 编码实战(无状态)
4.1 Spark Streaming编码步骤:
- 创建一个StreamingContext
- 从StreamingContext中创建一个数据对象
- 对数据对象进行Transformations操作
- 输出结果
- 开始和停止
4.2 利用Spark Streaming实现WordCount
需求:监听某个端口上的网络数据,实时统计出现的不同单词个数。
1,需要安装一个nc工具:sudo yum install -y nc
2,执行指令:nc -lk 9999 -v
import os
# 配置spark driver和pyspark运行时,所使用的python解释器路径
PYSPARK_PYTHON = "/home/hadoop/miniconda3/envs/datapy365spark23/bin/python"
JAVA_HOME='/home/hadoop/app/jdk1.8.0_191'
SPARK_HOME = "/home/hadoop/app/spark-2.3.0-bin-2.6.0-cdh5.7.0"
os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON
os.environ['JAVA_HOME']=JAVA_HOME
os.environ["SPARK_HOME"] = SPARK_HOME
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
if __name__ == "__main__":
sc = SparkContext("local[2]",appName="NetworkWordCount")
#参数2:指定执行计算的时间间隔
ssc = StreamingContext(sc, 1)
#监听ip,端口上的上的数据
lines = ssc.socketTextStream('localhost',9999)
#将数据按空格进行拆分为多个单词
words = lines.flatMap(lambda line:line.split(' '))
#将单词转换为(单词,1)的形式
pairs = words.map(lambda word:(word,1))
#统计单词个数
wordCounts = pairs.reduceByKey(lambda x,y:x+y)
#打印结果信息,会使得前面的transformation操作执行 类似于action
wordCounts.pprint()
#启动StreamingContext
ssc.start()
#等待计算结束 这里在jupyter notebook交互式环境中才需要加
ssc.awaitTermination()
可视化查看效果: 主机地址:4040 点击streaming,查看效果
5. Spark Streaming的状态操作
-
Spark Streaming实现的是一个实时批处理操作,每隔一段时间将数据进行打包,封装成RDD,是无状态的。
- 无状态:指的是每个时间片段的数据之间是没有关联的。
-
需求:想要将一个大时间段(1天),即多个小时间段的数据内的数据持续进行累积操作,一般超过一天都是用RDD或Spark SQL来进行离线批处理
-
在Spark Streaming中存在两种状态操作
- UpdateStateByKey
- Windows操作
-
使用有状态的transformation,需要开启Checkpoint
- spark streaming 的容错机制
- 它将足够多的信息checkpoint到某些具备容错性的存储系统如hdfs上,以便出错时能够迅速恢复
5.1 updateStateByKey
步骤:
- 首先,要定义一个state,可以是任意的数据类型
- 其次,要定义state更新函数–指定一个函数如何使用之前的state和新值来更新state
- 对于每个batch,Spark都会为每个之前已经存在的key去应用一次state更新函数,无论这个key在batch中是否有新的数据。如果state更新函数返回none,那么key对应的state就会被删除
- 对于每个新出现的key,也会执行state更新函数
5.2 案例:updateStateByKey
需求:监听网络端口的数据,获取到每个批次的出现的单词数量,并且需要把每个批次的信息保留下来
代码
import os
# 配置spark driver和pyspark运行时,所使用的python解释器路径
PYSPARK_PYTHON = "/home/hadoop/miniconda3/envs/datapy365spark23/bin/python"
JAVA_HOME='/home/hadoop/app/jdk1.8.0_191'
SPARK_HOME = "/home/hadoop/app/spark-2.3.0-bin-2.6.0-cdh5.7.0"
os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON
os.environ['JAVA_HOME']=JAVA_HOME
os.environ["SPARK_HOME"] = SPARK_HOME
from pyspark.streaming import StreamingContext
from pyspark.sql.session import SparkSession
# 创建SparkContext
spark = SparkSession.builder.master("local[2]").getOrCreate()
sc = spark.sparkContext
ssc = StreamingContext(sc, 3)
#开启检查点
ssc.checkpoint("checkpoint")
#定义state更新函数
def updateFunc(new_values, last_sum):
return sum(new_values) + (last_sum or 0)
lines = ssc.socketTextStream("localhost", 9999)
# 对数据以空格进行拆分,分为多个单词
counts = lines.flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word, 1)) \
.updateStateByKey(updateFunc=updateFunc)#应用updateStateByKey函数
counts.pprint()
ssc.start()
ssc.awaitTermination()
5.3 Windows
- 窗口长度L:运算的数据量
- 滑动间隔G:控制每隔多长时间做一次运算
每隔G秒,统计最近L秒的数据
操作细节
- Window操作是基于窗口长度和滑动间隔来工作的
- 窗口的长度控制考虑前几批次数据量
- 默认为批处理的滑动间隔来确定计算结果的频率
典型案例:热点搜索词滑动统计,每隔10秒,统计最近60秒钟的搜索词的搜索频次,并打印出最靠前的3个搜索词出现次数。
5.4 案例 windows
监听网络端口的数据,每隔3秒统计前6秒出现的单词数量
import os
# 配置spark driver和pyspark运行时,所使用的python解释器路径
PYSPARK_PYTHON = "/home/hadoop/miniconda3/envs/datapy365spark23/bin/python"
JAVA_HOME='/home/hadoop/app/jdk1.8.0_191'
SPARK_HOME = "/home/hadoop/app/spark-2.3.0-bin-2.6.0-cdh5.7.0"
# 当存在多个版本时,不指定很可能会导致出错
os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON
os.environ['JAVA_HOME']=JAVA_HOME
os.environ["SPARK_HOME"] = SPARK_HOME
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql.session import SparkSession
def get_countryname(line):
country_name = line.strip()
if country_name == 'usa':
output = 'USA'
elif country_name == 'ind':
output = 'India'
elif country_name == 'aus':
output = 'Australia'
else:
output = 'Unknown'
return (output, 1)
if __name__ == "__main__":
#定义处理的时间间隔
batch_interval = 10 # base time unit (in seconds)
#定义窗口长度
window_length = 6 * batch_interval
#定义滑动时间间隔
frequency = 1 * batch_interval
#获取StreamingContext
spark = SparkSession.builder.master("local[2]").getOrCreate()
sc = spark.sparkContext
ssc = StreamingContext(sc, batch_interval)
#需要设置检查点
ssc.checkpoint("checkpoint")
lines = ssc.socketTextStream('localhost', 9999)
addFunc = lambda x, y: x + y
invAddFunc = lambda x, y: x - y
#调用reduceByKeyAndWindow,来进行窗口函数的调用
window_counts = lines.map(get_countryname) \
.reduceByKeyAndWindow(addFunc, invAddFunc, window_length, frequency)
#输出处理结果信息
window_counts.pprint()
ssc.start()
ssc.awaitTermination()