Spark Streaming窗口操作

2023-12-24

以下是获取窗口大小为 30 秒、幻灯片大小为 10 秒的字数统计的简单代码。

import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.api.java.function._
import org.apache.spark.streaming.api._
import org.apache.spark.storage.StorageLevel

val ssc = new StreamingContext(sc, Seconds(5))

// read from text file
val lines0 = ssc.textFileStream("test")
val words0 = lines0.flatMap(_.split(" "))

// read from socket
val lines1 = ssc.socketTextStream("localhost", 9999, StorageLevel.MEMORY_AND_DISK_SER)
val words1 = lines1.flatMap(_.split(" "))

val words = words0.union(words1)
val wordCounts = words.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(30), Seconds(10))

wordCounts.print()
ssc.checkpoint(".")
ssc.start()
ssc.awaitTermination()

但是,我从这一行收到错误:

val wordCounts = words.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(30), Seconds(10))

。特别是,从_ + _。错误是

51: error: missing parameter type for expanded function ((x$2, x$3) => x$2.$plus(x$3))

有人能告诉我问题是什么吗?谢谢!


这非常容易修复,只需明确类型即可。
val wordCounts = words.map((_, 1)).reduceByKeyAndWindow((a:Int,b:Int)=>a+b, Seconds(30), Seconds(10))

在这种情况下 scala 无法推断类型的原因解释如下这个答案 https://stackoverflow.com/questions/9118264/multiple-parameter-closure-argument-type-not-inferred

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark Streaming窗口操作 的相关文章

随机推荐

  • 将字符串拆分为具有多个单词边界定界符的单词

    我认为我想做的是一项相当常见的任务 但我在网上没有找到任何参考 我有带标点符号的文本 我想要一个单词列表 Hey you what are you doing here 应该 hey you what are you doing here
  • 等待大文件在 Excel 中打开

    我一直在尝试在 VBA 中循环处理一堆大的 csv 文件 每个大约 50MB 在每次迭代中 我都会打开一个新的 CSV 来操作数据 但是当 csv 打开时 会出现一条下载消息 指出文件正在打开 并且在 VBA 等待其完成时 进度条总是卡在某
  • 是否有更高级的 C# 控制台库? [关闭]

    Closed 这个问题正在寻求书籍 工具 软件库等的推荐 不满足堆栈溢出指南 help closed questions 目前不接受答案 是否有任何 NET 库可以 输出彩色字符 单词 在控制台中绘制进度条等内容 类似于 wget 我可能会
  • $project MongoDB 中的 $filter 使用 Spring Data

    我有一个子文档 它是父文档的数组 设备 在该数组中 我有一个属性 它是日期属性 我想按确定的日期找到包含子子文档的父文档 如下所示 id ObjectId 5818fa596969a1339093a7da fecha ISODate 201
  • malloc() 和 malloc_consolidate() 中的段错误

    当我查看 gdb 中的回溯时 我的应用程序有时会出现段错误 主要是在 malloc 和 malloc consolidate 中 我确认机器有足够的可用内存 它甚至没有开始交换 我检查了数据段和最大内存大小的 ulimit 两者都设置为 无
  • Android 字母部分列表视图,如联系人应用程序列表视图

    现在谁能帮我看看我的观点是这样的 请帮助我 我太混乱了 这是我的代码 电话簿列表视图 xml
  • 如何将本地角色与从 ansible-galaxy 加载的角色分开?

    我观察到从 Galaxy 下载的角色安装在roles 目录 我们已经有内部目录 因此很难区分外部目录和内部目录 有没有办法将它们保存在单独的目录中 这样我们就可以避免混淆 在大多数情况下 我希望有一个更新银河系脚本的脚本 并且我们不会在内部
  • Django表单错误列表渲染位置

    当我使用 f as p 显示有错误的表单时 错误列表 ul 始终首先出现 然后是标签和输入字段 例如 ul class errorlist li This field is required li ul p p
  • 如何将所有图像转换为 jpg?

    我有脚本
  • 使用 nltk.download() 下载错误

    我正在使用 Python 试验 NLTK 包 我尝试使用下载NLTKnltk download 我收到这样的错误消息 如何解决这个问题呢 谢谢 我使用的系统是VMware下安装的Ubuntu IDE是Spyder 使用后nltk downl
  • .NET Core 依赖注入 -> 获取接口的所有实现

    我有一个名为IRule以及实现该接口的多个类 我想使用 NET Core 依赖注入容器来加载所有实现IRule 所以所有的规则都被执行了 不幸的是我无法完成这项工作 我知道我可以注射IEnumerable
  • CORBA 与 Web 服务

    为什么 WebServices 比 CORBA 更具优势 我怀疑一切都是从防火墙问题开始的 CORBA 请求是二进制的 正常工作需要多个随机端口 因此 CORBA 请求和响应在第一次出现时会被防火墙阻止 HTTP 和 FTP 也使用虚拟端口
  • 在文本框中包含按钮

    我想补充一点Button 这会删除中的所有文本TextBox 是否可以将此 删除 按钮放入文本框中 就像在iPhone 文本框 我希望在你的帮助后它看起来像这样 我用控制模板玩了一些东西 但没有得到想要的结果 解决此问题的一种方法可能是使用
  • 如何管理多个 grails/groovy 版本的开发?

    我有一个使用 groovy 1 7 5 和 grails 1 3 4 的项目 但我的新项目尚未启动 它将使用最新版本的 groovy 1 8 6 和 grails 2 0 4 我将在这两个项目中工作 那么如何在Windows环境下管理不同版
  • 无法将带有 ACL 公共读取的文件上传到 Digital Ocean 空间

    我正在尝试从浏览器将图像上传到数字海洋空间 这些图像应该是公开的 我能够成功上传图像 然而 尽管 ACL 设置为public read 上传的文件始终是私有的 我知道它们是私有的 因为 a 仪表板显示权限是 私有 b 因为公共 URL 不起
  • 扩展 collections.namedtuple 是否有效?

    我想用类似的东西collections namedtuple 它很好地强制了不变性并促进了简单的值类 但它不允许子类化 例如我想做类似下面的事情添加额外的只读属性 https stackoverflow com questions 2193
  • 如何使用正则表达式匹配单个空格总数而不仅仅是单个空格

    这是我目前拥有的 它将匹配字母数字字符和空格 a z0 9 s 我想做的是确保只有在不超过一 1 个空格的情况下才会匹配 上面将匹配 这是一个测试 但我只希望它在输入是 This isatest 或 T hisisatest 时匹配 一旦空
  • 根据轮廓分割 pdf

    我想使用 pyPdf 根据大纲分割 pdf 文件 其中大纲中的每个目标都指 pdf 中的不同页面 示例大纲 main gt points to page 1 sect1 gt points to page 1 sect2 gt points
  • Google 云端硬盘上传进度条

    有没有办法为上传到 Google Drive 创建进度条 我正在使用适用于 Javascript 的 Google Drive SDK 我在他们的文档中根本找不到任何内容 真的很感谢它的一点帮助 我查阅了很多代码 几乎是我这几周在谷歌上找到
  • Spark Streaming窗口操作

    以下是获取窗口大小为 30 秒 幻灯片大小为 10 秒的字数统计的简单代码 import org apache spark SparkConf import org apache spark streaming import org apa