非时间戳列上的 Spark 结构化流窗口

2024-05-03

我收到以下形式的数据流:

+--+---------+---+----+
|id|timestamp|val|xxx |
+--+---------+---+----+
|1 |12:15:25 | 50| 1  |
|2 |12:15:25 | 30| 1  |
|3 |12:15:26 | 30| 2  |
|4 |12:15:27 | 50| 2  |
|5 |12:15:27 | 30| 3  |
|6 |12:15:27 | 60| 4  |
|7 |12:15:28 | 50| 5  |
|8 |12:15:30 | 60| 5  |
|9 |12:15:31 | 30| 6  |
|. |...      |...|... |

我有兴趣将窗口操作应用于xxx列就像 Spark Streaming 中对时间戳的窗口操作一样,具有一定的窗口大小和滑动步长。

让在groupBy下面有窗口函数,lines表示窗口大小为 2、滑动步长为 1 的流数据帧。

val c_windowed_count = lines.groupBy(
  window($"xxx", "2", "1"), $"val").count().orderBy("xxx")

因此,输出应如下所示:

+------+---+-----+
|window|val|count|
+------+---+-----+
|[1, 3]|50 |  2  |
|[1, 3]|30 |  2  |
|[2, 4]|30 |  2  |
|[2, 4]|50 |  1  |
|[3, 5]|30 |  1  |
|[3, 5]|60 |  1  |
|[4, 6]|60 |  2  |
|[4, 6]|50 |  1  |
|...   |.. | ..  |

我尝试使用partitionBy但 Spark 结构化流不支持它。

我正在使用 Spark 结构化流2.3.1。

Thanks!


目前无法使用 Spark 结构化流以这种方式在非时间戳列上使用窗口。然而,你能做的是转换xxx列到时间戳列,做groupBy and count,然后再变换回来。

from_unixtime可用于将自 1970-01-01 以来的秒数转换为时间戳。使用xxx列为秒,可以创建一个假时间戳在窗口中使用:

lines.groupBy(window(from_unixtime($"xxx"), "2 seconds", "1 seconds"), $"val").count()
  .withColumn("window", struct(unix_timestamp($"window.start"), unix_timestamp($"window.end")).as("window"))
  .filter($"window.col1" =!= 0)
  .orderBy($"window.col1")

上面,分组是在转换后的时间戳上完成的,下一行会将其转换回原始数字。过滤器已完成,因为前两行将是一个窗口[0,2](即仅在具有xxx equals 1)但可以跳过。

上述输入的结果输出:

+------+---+-----+
|window|val|count|
+------+---+-----+
| [1,3]| 50|    2|
| [1,3]| 30|    2|
| [2,4]| 30|    2|
| [2,4]| 50|    1|
| [3,5]| 30|    1|
| [3,5]| 60|    1|
| [4,6]| 60|    2|
| [4,6]| 50|    1|
| [5,7]| 30|    1|
| [5,7]| 60|    1|
| [5,7]| 50|    1|
| [6,8]| 30|    1|
+------+---+-----+
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

非时间戳列上的 Spark 结构化流窗口 的相关文章

  • Access / Word 2010 VBA 邮件合并尝试打开 [文件夹名称].mdb 而不是 ACCDB 源

    我们正在尝试从 Access 中自动执行邮件合并过程 单击按钮后 VBA 将运行指定当前数据库 accdb 作为数据源并运行 SQL 具体代码如下 Set up Word Dim objWord As Object Set objWord
  • 迭代 pandas 数据框的最快方法?

    如何运行数据框并仅返回满足特定条件的行 必须在之前的行和列上测试此条件 例如 1 2 3 4 1 1 1999 4 2 4 5 1 2 1999 5 2 3 3 1 3 1999 5 2 3 8 1 4 1999 6 4 2 6 1 5 1
  • 如何为 Windows toast 注册协议?

    如何注册 Windows toast 协议 样本中来自https blogs msdn microsoft com tiles and toasts 2015 07 02 adaptive and interactive toast not
  • ngmodel与Angular2中复选框的动态数组绑定

    我有一个 Angular 2 组件 其中我从数组生成复选框列表 现在我需要根据选中的复选框填充不同的数组 这应该是双向绑定 这意味着如果复选框的值已在数组中 则必须已经检查了复选框 我在 Angular 1 中使用了一个名为 checkli
  • 闪亮井板宽度

    library shiny library shinydashboard ui lt dashboardPage dashboardHeader dashboardSidebar dashboardBody wellPanel tags d
  • 使用 crypt() 加密

    我目前正在做一个非常安全的登录系统 但我是 crypt 函数的新手 需要一些快速帮助 我在注册过程中使用 crypt 加密密码字符串并将其保存到数据库中 但是 我如何在登录过程中解密密钥 或者我应该怎么做 或者是否可以对提交的密码字符串进行
  • Android ScrollView fillViewport 不工作

    我有一个简单的布局 名称位于顶部 按钮位于屏幕底部 或者超出该按钮 以防我添加更多项目 所以我使用带有 LinearLayout 的 ScrollView 如下所示
  • 在 Android 中使用 iText 将图像添加到特定位置

    我想使用 Android 中的 iText 将图像添加到 PDF 文件中的特定位置 这是一个可填写的表单 我添加了作为图像占位符的文本框 我想要做的就是像这样获取该文本框和图像 public class FormFill public st
  • Googletest:如何异步运行测试?

    考虑到一个包含数千个测试的大型项目 其中一些测试需要几分钟才能完成 如果按顺序执行 整套测试需要一个多小时才能完成 通过并行执行测试可以减少测试时间 据我所知 没有办法直接从 googletest mock 做到这一点 就像 async选项
  • NGinx $proxy_add_x_forwarded_for 和 real_ip_header

    我在 NGinx 下有一个 web 应用程序和另一个前端负载均衡器 如下所示 x x x x IP 地址 客户端 a a a a gt LB b b b b gt NGX c c c c gt WEBAPP d d d d 这是我的 NGi
  • Typescript 函数接口重载

    我有以下代码 interface MySecondInterface a type A interface MyInterface val1 string val2 string MySecondInterface a
  • 实例化 Microsoft.Office.Interop.Excel.Application 对象时出现错误:800700c1

    实例化 Microsoft Office Interop Excel Application 以从 winforms 应用程序生成 Excel 时 出现以下错误 这之前是有效的 但突然间它停止工作了 尽管代码和 Excel 版本没有变化 我
  • 带显示块的SPAN

    和默认有什么区别 div 元素和默认值 span 元素与display block HTML 元素的有效性和语义存在差异 否则它们是相同的 div and span两者都被定义为通用容器 在 HTML 方面没有更深层次的含义 一个默认为块显
  • 从 Azure 应用服务连接到 MongoDB Atlas 集群

    我在 Azure 上有一个 Web 应用程序 它连接到 Atlas cloud mongodb com 上托管的 MongoDB 集群 我想使用 Atlas 这样我就不必关心 MongoDb 配置 问题是我的集群连接超时 我必须在我的 mo
  • 是否可以在 C# 中强制接口实现为虚拟?

    我今天遇到了一个问题 试图重写尚未声明为虚拟的接口方法的实现 在这种情况下 我无法更改接口或基本实现 而必须尝试其他方法 但我想知道是否有一种方法可以强制类使用虚拟方法实现接口 Example interface IBuilder
  • 匿名结构体作为返回类型

    下面的代码编译得很好VC 19 00 23506 http rextester com GMUP11493 标志 Wall WX Za 与VC 19 10 25109 0 标志 Wall WX Za permissive 这可以在以下位置检
  • Android:如何检测手机设置中的语言已更改

    我如何检测我的手机语言是否已更改 例如 Facebook 应用程序将向我们宣布 please wait we preparing your language i used myString Locale getDefault getDisp
  • 保存符号方程以供以后使用?

    From here http www mathworks com help releases R2011a toolbox symbolic brvfu8o 1 html brvfxem 1 我正在尝试求解这样的符号方程组 syms x y
  • 如果产品重量超过1000克,如何以公斤为单位显示

    在 Storefront 主题中 我使用下面的代码将格式化重量从 1000g 更改为 1kg add action woocommerce after shop loop item title show weight 10 function
  • android ndk 硬件调试内存

    背景 我对 C 很有经验 对 Android 和 Java 还很陌生 但这是编程的环境问题 我已经用 ANSI C 开发了一个管理应用程序 可以移植到任何操作系统 只需在依赖于操作系统的代码中添加 UI 即可 它使用相当多的内存 特别是对于

随机推荐

  • 无法从超时获得自动化扩展:从渲染器接收消息超时

    使用 Selenium Webdriver C 我时不时会收到下一个错误 System InvalidOperationException 未知错误 无法从超时获取自动化扩展 从渲染器接收消息超时 3 959 会话信息 chrome 37
  • 点分隔的字符串资源名称有什么用?

    我使用 Snake style 来命名字符串资源 在某人的代码中 我发现了另一种带有点的符号 我无法找到关于这个主题的任何单词
  • 如何在 Windows 7 中配置 cabal?

    我已经在Windows 7中安装了Haskell Platform 2012 我在控制台中编写cabal update我收到消息说有新版本的阴谋集团 我写的cabal install cabal install 安装完成后 它告诉我 cab
  • 如何删除 TextBlock 周围的多余空间

    我为我的 TextBlock 设置了以下内容
  • 将嘈杂的硬币重塑为圆形

    我正在使用 JavaCV OpenCV 包装器 进行硬币检测 但是当硬币连接时我遇到了一些问题 如果我尝试侵蚀它们以分离这些硬币 它们就会失去圆形形状 如果我尝试计算每个硬币内部的像素 可能会出现问题 因此某些硬币可能会被误算为更大的硬币
  • SQL Server 2008 FileStream 与普通文件

    我正在创建一个像 youtube 这样的应用程序来存储视频 我需要一些建议 我应该使用 SQL Server FileStream 来存储视频文件 还是应该将它们存储在硬盘上的某个位置并将路径记录为 SQL Server 内的 varcha
  • 将第一行粘贴到列表中的列名称

    我有 68 个数据文件 全部具有相同的标识符 但具有不同的指示符 我将这些单独的文件转换为一个列表 其中每个数据框作为一个单独的元素 每个数据框的第一行是年份 我想将其粘贴到列名称中 我希望能够用 分隔它 例如 现在列名称为 Arbeits
  • 在 WPF 中向上/向下移动 ListBoxItem

    我创建了一个包含文件名的列表框 我想为用户提供一个选项 可以使用上 下按钮并使用拖放来上下移动文件名 任何人都知道如何实现此功能 XAML 代码
  • 如何续订过期的 ClickOnce 证书?

    我需要对一年多没有碰过的 ClickOnce 应用程序进行一些更改 因此证书已过期 我读过 使用新证书发布将使应用程序失败 因为它将使用不同的密钥进行签名 因此我认为我需要使用相同的证书但不知道如何更新它 如果您正在寻求快速解决方案 那么您
  • 如何将数字 010 转换为字符串“010”

    在控制台中执行一些随机表达式时 我发现 010 返回8 甚至 011 0100 都通过考虑八进制数字系统返回结果 如果我想转换一个数字 我该怎么做010到一个字符串 010 不仅为了010但对于每个相似的数字 我设法找到了一种类似的解释he
  • FindObjectOfType 返回 null

    我遇到的问题是我捡起一个掉落的物品 为枪添加弹药 使用所有方法和变量构建了一个 Gun 类 构建了一个从 Gun 类派生的 Rifle 类 步枪工作完美 没有任何问题 我现在添加一个 拾取 系统 其中 x 数量的敌人会掉落拾取 这是要拾取的
  • 使用 WIA 检测所有可用的扫描仪分辨率

    如何使用 WIA 2 0 以编程方式检测指定扫描仪的所有可用分辨率 以 dpi 为单位 支持的页面大小怎么样 有任何想法吗 伪代码 假设您有设备信息 请连接到它 var device deviceInfo Connect 如果设备不为空 那
  • Postgres 检查文本数组中的约束以确保值的有效性

    我想创建类似的东西 CHECK ALL scopes IN read write delete update scopes这是表中的一个字段text 我想确保该数组中的所有值都是上面的值之一 对此有何意见 是否有可能通过以下方式获取这些值S
  • 如何定义与 Backbone 一起使用的 jade 模板

    我需要使用模板来渲染每个 ItemView var ItemView Backbone View extend className item template template itemTemplate html initialize fu
  • 如果新块大小小于初始块大小,我是否应该强制重新分配检查?

    在这种情况下 realloc 会失败吗 int a NULL a calloc 100 sizeof a printf 1 ptr d n a a realloc a 50 sizeof a printf 2 ptr d n a if a
  • Keycloak 重定向 URI 提供 https://localhost 而不是 http://localhost

    我正在尝试在 Keycloak 上验证 Cordova Android 应用程序 我之前提出过一个关于它的问题 但我在这里将其简化为仅提供要点 因为我在过去 2 3 天中发现了很多东西 Cordova Android 应用程序中的网页不可用
  • 在 JSFiddle 上运行的简单 AngularJS

    如何用以下代码制作 jsfiddle div ul li num li ul div
  • 主表节点缺失

    我已经安装了 Microsoft SQL Server 2012 Express 当我启动 Management Studio 时 我无法查看系统数据库上的任何 表节点 这是一个错误吗 这是一个错误 如以下链接所述 http connect
  • Laravel 4 - 工匠错误 SQLSTATE[42000]

    我正在尝试为我的用户表创建一个新的迁移 我有以下架构 Schema create users function t t gt increments id t gt string username 16 t gt string passwor
  • 非时间戳列上的 Spark 结构化流窗口

    我收到以下形式的数据流 id timestamp val xxx 1 12 15 25 50 1 2 12 15 25 30 1 3 12 15 26 30 2 4 12 15 27 50 2 5 12 15 27 30 3 6 12 15