Structured Streaming Programming Guide-2.3.0

2023-05-16

概览
结构化流是一个可伸缩和容错的流处理引擎,内置在Spark SQL 引擎中。你可以以对静态数据表达批处理计算的方式表达你的流计算。Spark SQL引擎会注意逐渐/持续第运行,并随着流数据不断到来而更新最终的结果。你可以使用Scala/Java/Python/R语言的Dataset/DataFrame API来表达流愈合、event-time windows、stream-to-batch joins等等。计算在相同的优化的Spark SQL引擎上执行。最终,系统通过checkpointing和write ahead logs保证端到端仅仅一次容错。总之,结构化流提供快速、可伸缩、容错、端到端一次流处理,在用户没必要思考流的情况下。
本质上,默认情况下,结构化流查询使用小批量的处理引擎来处理,处理数据流作为一系列小批量任务因而获得100ms的端到端延迟和仅仅一次容错保证。然而,从Spark2.3开始,我们引入了一种新的低延时处理模式,称作持续处理,能够获得端到端低至1ms的延迟和保证至少一次。不需要改变你查询中的Dataset/DataFrame操作,你可以基于你的程序需要选择这种模式。
在这个指南中,我们将带着你浏览编程模式和APIs。我们会主要使用默认的小批量处理模式来解释概念,然后讨论持续处理模式。首先,让我们从一个简单的结构化流查询例子即流式单词计数开始。
Quick Example
我们假设你想维护一个运行的单词计数任务,文本数据来自一个监听TCP socket的数据服务。让我们看下你如何使用结构化流来表达它。你可以看到Scala/Java/Python/R语言的全部代码。如果你下载了Spark,你可以直接运行示例。无论如何,让我们一步一步看下示例,理解它是如何工作的。首先,我们必须引入必要的类,创建一个本地SparkSession,所有功能的起点都和Spark相关。

import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.streaming.StreamingQuery;
import java.util.Arrays;
import java.util.Iterator;
SparkSession spark=SparkSession.builder().appName("JavaStructuredNetworkWordCount").getOrCreate();

接下来,创建一个streaming DataFrame,代表从监听本地9999端口的服务收到的文本数据,然后转换DataFrame来计算单词计数。

//create DataFrame representing the stream of input lines from connecting to localhost:9999
Dataset<Row> lines=spark.readStream().format("socket").option("host","localhost").option("port",9999).load();
//split the lines into words
Dataset<String> words=lines.as(Encoders.String()).flatMap((FlatMapFunction<String,String>) x -> Arrays.asList(x.split(" ")).iterator(),Encoders.String());
//generate running word count
Dataset<Row> wordCounts=words.groupBy("value").count();
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Structured Streaming Programming Guide-2.3.0 的相关文章

  • PHP 从视频中提取音频

    我需要一种从某些视频中提取音频的方法 用 PHP 我有来自 YouTube 的视频流 所以我真的很喜欢它 如果它是动态流 而不是我必须将其保存到临时目录并在那里处理它 尽管这是可以接受的 谢谢 Isaac Waller编辑 更具体地说 我有
  • 如何在不先读取整个图像的情况下就地缩放流式位图?

    我有一个图像密集型的 Android 应用程序 我目前正在使用Bitmap createScaledBitmap http developer android com reference android graphics Bitmap ht
  • 使用 PHP 进行实时视频流传输

    我有一个 PHP AJAX MYSQL 聊天应用程序 我想将视频聊天添加到我的应用程序中 如何在 PHP 应用程序中创建用于实时视频会议 聊天的实时视频流 如果我想构建这样一个系统 我需要了解哪些关键术语 首先使用 PHP 是个好主意吗 有
  • 命令行流网络摄像头,带有来自 Ubuntu 服务器的 WebM 格式的音频

    我正在尝试从连接到无头 Ubuntu 服务器 运行 Maverick 10 10 的网络摄像头传输视频和音频 我希望能够以 WebM 格式 VP8 视频 OGG 进行流式传输 带宽有限 因此流必须低于 1Mbps 我尝试过使用 FFmpeg
  • 对 DStream 进行类型参数化

    Can a DStream have type parameters 如果是 怎么办 当我尝试时lazy val qwe mStream mapWithState stateSpec on myDStream DStream A B 类参数
  • 将大数据从 WCF 服务发送到客户端的最佳方式是什么?

    我有一项返回大量数据的特定服务 WCF 中处理此问题的最佳实践和选项是什么 这些大数据在完成所有过滤后返回 因此无法再进行过滤 数据可以达到GB 我确实知道系统可以处理的数据量是有限的 但在上述场景中 您会推荐哪些选项 替代方案 使用流媒体
  • Java 增量流式 JSON 库

    谁能推荐一个 Java 的 JSON 库 它允许我以非阻塞的方式提供传入的数据块 我已读完更好的 Java JSON 库 https stackoverflow com questions 338586 a better java json
  • WCF Web服务流响应的最佳实践

    我正在尝试从 WCF Web 服务中提取大量数据 请求相当小 而响应消息将非常大 目前 由于 IIS6 对其可分配的内存 1 4GB 有限制 Web 服务正在引发 SystemOutOfMemory 异常 我在一些博客中读到 实施流式传输可
  • C# 中的 StreamReader 和缓冲区

    我对 StreamReader 的缓冲区使用有疑问 这里 http msdn microsoft com en us library system io streamreader aspx http msdn microsoft com e
  • 结构化 Spark 流指标检索

    我有一个具有结构化 Spark 流的应用程序 我想获取一些指标 例如调度延迟 延迟等 通常 此类指标可以在 Spark UI Streaming 选项卡中找到 但是 结构化流不存在此类功能我知道 那么如何获取这些指标值呢 目前 我尝试使用查
  • 使用 commons-exec 流式输出?

    谁能给我一个例子来说明如何流式传输外部程序的输出DefaultExecutor 我没有找到任何描述如何执行此操作的文档 我的外部进程将运行几个小时 因此仅获取所有输出数据是不可行的 它必须被流式传输 注意 此解决方案是同步的 因此它不会流式
  • 如何使用 Scala Stream 类读取大型 CSV 文件?

    如何使用 Scala Stream 读取大型 CSV 文件 gt 1 Gb 你有代码示例吗 或者您会使用不同的方式来读取大型 CSV 文件而不先将其加载到内存中吗 只需使用Source fromFile getLines正如你已经说过的 这
  • python-twitter 流 api 支持/示例

    我正在与python twitter http code google com p python twitter 并意识到 Twitter 提供流媒体api http dev twitter com pages streaming api实
  • 使用 ImageIO 发送图像流?

    我设置了一个 ServerSocket 和一个 Socket 因此 ServerSocket 使用 ImageIO write 发送图像流 并且 Socket 尝试读取它们并用它们更新 JFrame 所以我想知道 ImageIO 是否可以检
  • 显示MJPEG流的跨浏览器解决方案

    有没有一种轻量级 免费且可靠的方式在跨浏览器环境中显示 MJPEG 我正在尝试显示来自轴2120 http www axis com techsup cam servers cam 2120 index htm我正在开发的网站上有 IP 摄
  • 播放 video.js ustream m3u8 文件流

    我尝试在网页中播放带有 video js 的 m3u8 文件流 但我无法做到这一点 我不知道错误在哪里
  • MySQL使用BLOB的二进制存储VS OS文件系统:大文件、大数量、大问题

    我正在运行的版本 基本上 最新的一切 PHP 5 3 1MySQL 5 1 41阿帕奇 2 2 14操作系统 CentOS 最新 情况是这样的 我有数千个非常重要的文档 从客户合同到语音签名 客户对合同的授权录音 文件类型包括但不限于jpg
  • Storm动态拓扑

    Storm 支持动态拓扑吗 我想要的功能是在 Storm 拓扑运行时根据用户要求动态更改拓扑 例如 当用户想知道流的前 10 个单词时 我使用前 10 个 Bolt 来处理它 当用户想知道其他内容时 我使用另一个 Bolt 来处理流并 拔掉
  • 数据库镜像/Postgres流复制

    我不是 DBA 我是基于企业数据库的应用程序的主要开发人员 我目前正在指定一些新机器来升级我们现有的企业数据库 目前 我们在 DR 站点上运行带有数据库的 Postgres 8 4 该数据库通过前员工执行的一些自定义 rsync 工作定期接
  • 从开放的 HTTP 流中读取数据

    我正在尝试使用 NET WebRequest WebResponse 类来访问 Twitter 流 API 此处 http stream twitter com spritzer json 我需要能够打开连接并从打开的连接中增量读取数据 目

随机推荐

  • win10安装系统自带应用

    以管理员身份启动系统自带的Windows Powershell组件 xff0c 接着输入Get AppxPackage allusers Select Name PackageFullName xff0c 通过该命令获取当前系统安装的所有应
  • SQL DDL从MySQL到Oracle

    最新一个项目的sql ddl为MySQL准备的 xff0c 我想在Oracle中使用 之前不太了解两者的区别 xff0c 结果报错一坨 于是顶着头皮开始看什么问题 xff0c 以下是我陷过的坑 xff0c 让大家看看 废话少说 xff0c
  • 7 MySQL安全概述

    1 常见因素 密码 常见的密码要求 xff1a 包含大小写 数字 特殊字符限制 长度 不要保存密码明文 为防止彩虹表 xff0c 也不要简单的使用hash方法 xff0c 可以采用hash hash password 43 salt 的方式
  • 关于SIFT和SURF介绍

    SIFT xff08 尺度不变特征变换 xff09 关于一些角点检测技术 xff0c 比如 Harris 等 它们具有旋转不变特性 xff0c 即使图片发生了旋转 xff0c 我们也能找到同样的角点 xff0c 但如果进行图像缩放 xff0
  • 7.2 MySQL权限系统原理

    MySQL权限系统的用户接口由SQL语句组成 xff0c 比如create user xff0c grant xff0c revoke 在数据库内部 xff0c MySQL把权限信息保存在MySQL database的赋权表中 MySQL服
  • 7.2.1 MySQL提供的权限

    MySQL提供的权限应用于不同的上下文和不同的操作级别 xff1a 管理权限使用户可以管理MySQL服务器的操作 这些权限是全局性的 xff0c 因为它们不是局限于某个特定的数据库 数据库权限应用于数据库和数据库的组成对象 这些权限可以被赋
  • 7.3 MySQL用户账号管理

    7 3 1用户名称和密码 MySQL把账号存储在mysql系统数据库的user表中 一个账号被定义成一个用户名称和能够连接到服务器的客户端主机 xff08 群 xff09 账号都有一个密码 MySQL支持授权插件 xff0c 也就是说一个账
  • 7 Oracle 管理用户和安全

    用户和安全概览 用户账号由一个用户名确认 xff0c 定义了用户的属性包括 xff1a 鉴权方式 数据库鉴权密码 永久存储和临时存储的默认表空间 表空间配额 账号状态 xff08 是否锁定 xff09 密码状态 xff08 是否过期 xff
  • linux-bash-find

    FIND 1 General Commands Manual FIND 1 1 NAME find search for files in a directory hierarchy 2 SYNOPSIS find H L P D debu
  • awk、任务管理

    awk awk F 39 39 39 span class hljs operator span class hljs keyword BEGIN span l 61 span class hljs number 0 span span c
  • java数据结构和算法

    常见的数据结构 数组 gt 方便通过下标随机访问数据 有序数组无序数组数组大小一旦确定无法变更栈 先进后出只能压入 xff08 push xff09 查看 xff08 peek xff09 删除 xff08 pop xff09 栈顶无法查找
  • spring概述

    spring框架主要包括以上几个方面
  • 查看进程_端口的命令

    1 Windows平台 在windows控制台窗口下执行 xff1a netstat nao findstr 9010 TCP 127 0 0 1 9010 0 0 0 0 0 LISTENING 3017 你看到是PID为3017的进程占
  • hadoop学习记录—2.8.2documentation—mapreduce Tutorial

    1 概况 hadoop MapReduce是一个软件框架 xff0c 在这个框架上可以很容易编写以可靠 容错地运行在大量廉价硬件组成的集群 xff08 上千节点 xff09 上 并行地处理大量数据 xff08 数TB数据集 xff09 的程
  • 服务器使用windows server 2008修改密码步骤教程

    1 进入服务器后右击计算机 xff0c 点击管理 xff0c 进入服务器管理器 2 在服务器管理器里面双击配置 xff08 打开下一列 xff09 双击本地用户和组 xff08 打开下一列 xff09 点击用户 3 右击Administra
  • yarn结构-2.9.0

    YARN最基本的想法就是把资源管理和任务调度 监听功能分成独立的守护进程 这个想法就是有一个全局的ResourceManager xff08 RM xff09 和每个应用独自的ApplicationMaster xff08 AM xff09
  • spark集群模式概览

    本文简短概述下spark如何在集群上运行 xff0c 使得更简单地理解涉及到的组件 可以通过读 应用提交指南 来学习在一个集群上加载应用 组件 spark应用作为独立的进程集运行在集群上 xff0c 在主应用 xff08 称为驱动程序 xf
  • SPARK RDD编程指南

    在高层次面上 xff0c 每个spark应用有一个驱动程序组成 xff0c 驱动程序运行用户的主函数 xff0c 在集群上执行很多并行操作 Spark提供的主要抽象是RDD xff0c 可以进行并行操作的跨节点分散的元素集 RDDs可以由H
  • Spark SQL,DataFrames and Datasets Guide

    概览 Spark SQL是Spark的一个结构化数据处理模块 不像基本的Spark RDD API xff0c Spark SQL提供的接口提供更多关于数据和执行的操作的结构信息 从内部看 xff0c Spark SQL使用额外的信息来执行
  • Structured Streaming Programming Guide-2.3.0

    概览 结构化流是一个可伸缩和容错的流处理引擎 xff0c 内置在Spark SQL 引擎中 你可以以对静态数据表达批处理计算的方式表达你的流计算 Spark SQL引擎会注意逐渐 持续第运行 xff0c 并随着流数据不断到来而更新最终的结果