使用 textFileStream 的 Python Spark Streaming 示例不起作用。为什么?

2024-02-22

我使用spark 1.3.1和Python 2.7

这是我第一次体验 Spark Streaming。

我尝试使用 Spark Streaming 从文件中读取数据的代码示例。

这是示例的链接:https://github.com/apache/spark/blob/master/examples/src/main/python/streaming/hdfs_wordcount.py https://github.com/apache/spark/blob/master/examples/src/main/python/streaming/hdfs_wordcount.py

我的代码如下:

conf = (SparkConf()
     .setMaster("local")
     .setAppName("My app")
     .set("spark.executor.memory", "1g"))
sc = SparkContext(conf = conf)
ssc = StreamingContext(sc, 1)
lines = ssc.textFileStream('../inputs/2.txt')
counts = lines.flatMap(lambda line: line.split(" "))\
          .map(lambda x: (x, 1))\
          .reduceByKey(lambda a, b: a+b)
counts.pprint()
ssc.start()
ssc.awaitTermination()

我遇到了类似的问题,但我意识到,一旦我设置 Streaming 运行,streamingcontext 就会从新文件中获取数据。它仅在流启动后摄取新放置在源目录中的数据。

实际上,pyspark文档说得非常明确:

文本文件流(目录)

Create an input stream that monitors a Hadoop-compatible file system for new files and reads them as text files. Files must be wrriten to the monitored directory by “moving” them from another location within the same file system. File names starting with . are ignored.
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

使用 textFileStream 的 Python Spark Streaming 示例不起作用。为什么? 的相关文章

  • Python OverflowError:数学范围错误[重复]

    这个问题在这里已经有答案了 当我尝试这个计算时 出现溢出错误 output math exp 1391 12694245 100 我知道发生这种情况是因为使用的数字 超出了双精度数的范围 但有什么方法可以解决这个问题并获得输出值 有人可以帮
  • 以 str.format 切片字符串

    我想实现以下目标str format x y 1234 5678 print str x 2 str y 2 我能够做到这一点的唯一方法是 print 0 1 format str x 2 str y 2 现在 这是一个例子 我真正拥有的是
  • 如何使用playsound模块停止音频?

    如何在Python代码中通过playaudio模块停止音频播放 我播放过音乐 但我无法停止音乐 我怎样才能阻止它 playsound playsound name of file 您可以使用多处理模块将声音作为后台进程播放 然后随时终止它
  • 使用 glGetFloatv 检索 pyglet 中的模型视图矩阵

    我正在使用 pyglet 在 python 中进行 3D 可视化 并且需要检索模型视图和投影矩阵来进行一些选择 我使用以下方式定义我的窗口 from pyglet gl import from pyglet window import wi
  • 为什么我会得到“ufunc 'multiply' did not contains a loop with Signature Matching types dtype('S32') dtype('S32') dtype('S32')”,其值来自 raw_

    我正在尝试创建一个非常简单的程序 它将绘制一个抛物线 其中v是速度 a是加速度和x是时间 用户将输入值v and a then v and a and x将决定y 我试图用这个来做到这一点 x np linspace 0 9 10 a ra
  • Python 中嵌套列表的排序和分组

    我有以下数据结构 列表的列表 4 21 1 14 2008 10 24 15 42 58 3 22 4 2somename 2008 10 24 15 22 03 5 21 3 19 2008 10 24 15 45 45 6 21 1 1
  • 手动安装开放多语言世界网 (NLTK)

    我正在使用一台只能访问专用网络并且无法从命令行发送指令的计算机 因此 每当我必须安装 Python 包时 我都必须手动安装 我什至不能使用 Pypi 幸运的是 NLTK 允许我手动下载语料库 从here https www nltk org
  • 如何使用appium自动化Android手机后退按钮

    我正在使用 Appium python 客户端库 对 Android 上的混合移动应用程序进行测试自动化 我无法找到任何方法来自动化或创建手势以使用 电话后退 按钮返回到应用程序的上一页 有没有可以使用的驱动函数 我尝试了 self dri
  • 完全定制的Python帮助用法

    我正在尝试使用 Python 创建完全自定义的 帮助 用法 我计划将其导入到许多我想要具有风格一致性的程序中 但遇到了一些麻烦 我不知道为什么我的描述忽略换行符 尝试过 和 我无法让 出现在 ARGS 行的 换行符之后 显然它们坐在自己的行
  • 如何从 Python 3.5 降级到 3.4

    我想安装 kivy 链接在这里 https kivy org docs installation installation windows html install win dist 用于项目 但是 当尝试使用 pip 安装它所依赖的包时
  • 是否可以在 Jupyter 笔记本中显示控制台?

    我希望能够使用 Jupyter 笔记本中的控制台在环境中进行摆弄 添加额外的单元格意味着我总是必须滚动到最底部或在我想要 类似控制台 文本字段的任何地方创建新单元格 是否可以有一个永久的控制台窗口 例如在窗口底部 Thanks 您可以启动连
  • 如何使直方图列的宽度都相同

    我在操作直方图时遇到了一些麻烦 我有一个包含两列的 df 我将它们绘制为堆叠直方图 我将它们放入特定的垃圾箱中 请参阅下面的代码 但我想在最后制作一个大垃圾箱 4000 10000 但是 默认情况下 大垃圾箱的列宽很大 有没有办法让这个大垃
  • t /= d 是什么意思? Python 和错误

    t current time b begInnIng value c change In value d duration def easeOutQuad swing function x t b c d alert jQuery easi
  • 在 python 中使用 subprocess.call 时如何将 stdout 重定向到文件?

    我正在从另一个 python 脚本 A 调用一个 python 脚本 B 使用 subprocess call 如何将 B 的标准输出重定向到指定的文件 我正在使用 python 2 6 1 传递一个文件作为stdout参数为subproc
  • 如何在 matplotlib 中第一个 x 轴的底部添加第二个 x 轴?

    我指的是已经提出的问题here https stackoverflow com questions 10514315 how to add a second x axis in matplotlib 在此示例中 用户通过将第二个轴添加到与标
  • 枚举上的 random.choice

    我想用random choice on an Enum I tried class Foo Enum a 0 b 1 c 2 bar random choice Foo 但是这段代码失败了KeyError 我怎样才能随机选择一个成员Enum
  • 返回吃异常

    我至少发现了以下行为weird def errors try ErrorErrorError finally return 10 print errors prints 10 It should raise NameError name E
  • 设置字符串中单词或字符数的限制

    假设我有一个字符串元素列表 wordlist hi what s up home diddle mc doo Oh wise master kakarot hello have a da 我希望列表中的每个元素最多包含 3 个单词或 20
  • PyMC3 和 Theano - 导入 pymc3 后,有效的 Theano 代码停止工作

    一些简单的 theano 代码可以完美运行 当我导入 pymc3 时停止工作 这里有一些片段可以重现错误 Initial Theano Code this works import theano tensor as tsr x tsr ds
  • 使用 TkInter 绑定设置不可交互(点击)覆盖

    我已经浏览了其他几篇关于类似问题的帖子 所有这些似乎都指向this https stackoverflow com questions 29458775 tkinter see through window not affected by

随机推荐

  • 将 Javascript 变量传递给 Codeigniter 中的 PHP 控制器

    大家好 我有这个 javascript 它必须传递一些变量 包括数组 我的问题是我无法使用 URL 传递这些值 因为我可能会处理许多值 我正在尝试使用 ajax JSON 但我无法检索值 这是我的 javascript function p
  • 如何使用 BeautifulSoup 获取标签内的 html 文本

    如何从示例 HTML 中提取数据beautifulsoup
  • Rails:表单提交后访问视图中的参数

    在我的 Rails 3 2 项目中 我有一个表单来创建一个新站点new html erb in app views sites div class field br div div class actions div 然后create函数于
  • IntelliJ找不到具体的方法

    我在以下代码中遇到编译错误 我不知道如何修复 String path document txt File file new File path Files readString file toPath cannot find symbol
  • Android:使用 xml 布局的日期选择器

    我尝试使用 xml 代码显示日期选择器 但它没有显示任何东西 我可以只通过java代码显示DatePicker吗 datePickerExample java public class datePickerExample extends A
  • 是否可以使用活动记录为表定义复合主键? [复制]

    这个问题在这里已经有答案了 我在没有 Rails 的 ruby 项目中使用 ActiveRecord 我需要为表定义复合主键 通常迁移会自动创建主键 是否可以使用活动记录为表定义自己的复合主键 有一些宝石可以赋予你这种能力 例如复合主键 h
  • 在 d3 中的图表上渲染之前简化线条

    我正在尝试绘制从蜂箱实时收集的数据 我们计划每 5 分钟收集一次数据 因此一年内将有大约 100 000 个数据点 我想设置一个图表来绘制数据 并在每次数据库更改时让新数据进入图表 使用 Meteor 我有一个简单的模型datacomb m
  • 如何使用 Python API 获取我在盈透证券的账户头寸?

    编辑 我找到了有关错误消息的解决方案 这是 IB 的 API 上的错误 我在下面作为答案显示的代码对于那些寻找干净的解决方案来从 IB 账户读取头寸和资产净值的人来说应该很有用 原来的问题 参见下面的解决方案 在此留下原始问题以获取上下文
  • React Apollo GraphQL 搜索/过滤

    我有一个使用 Apollo 客户端的带有 GraphQL 服务器的 React 项目 我试图弄清楚如何根据搜索文本和过滤选择更改查询结果 如果有人可以查看我的代码并给我一些帮助 我将不胜感激 对所有代码感到抱歉 我想也许它们都是相关的 服务
  • python:安装日志记录模块[关闭]

    Closed 这个问题需要细节或清晰度 help closed questions 目前不接受答案 我有 python 3 5 尝试在 PyCharm 中安装日志记录包 但出现错误 安装包 logging 时发生错误 确保您使用此包支持的
  • 改造 JSON 反序列化对象的 $ref 对其原始副本的引用

    我使用 Microsoft Net 和 Breeze 来获取 API 并且使用 Retrofit 得到的结果嵌套了重复的相同对象 例如 EmployeeJob 具有 Customer 导航属性 因此 API 结果如下所示 Id 1 Cust
  • PHP 中的会话超时:最佳实践

    两者之间的实际区别是什么session gc maxlifetime and session cache expire 假设我希望用户会话在 15 分钟无活动后 而不是首次打开后 15 分钟 无效 其中哪一项对我有帮助 我也知道我能做到se
  • Rails:activeadmin 覆盖创建操作

    我有一个 activeadmin 资源 它具有 own to user 关系 当我在活动管理中创建模型的新实例时 我想将当前登录的用户关联为创建该实例的用户 我想这是相当标准的东西 所以 我让它工作 controller do def cr
  • 直接在 PhpStorm 中运行 PHP 脚本

    如何在 PhpStorm 中立即运行当前的 PHP 文件 如何像 PyCharm 一样在 PHPstorm 中执行 PHP 命令 在编辑器中右键单击 选择Run 或按 ctrl shift f10 用于命令行脚本执行 如果您想在网络服务器上
  • WPF ItemsControl:将项目的类型限制为特定类型

    我正在创建一个 WPF 自定义控件来作为练习 以在 VS 面板中显示日志消息 错误 警告 消息 该控件是一个 ItemControl 每个项目都是一条要显示的消息 但我必须将消息分类到正确的类别中 因此我需要每个项目公开一些内容 可能是一个
  • 在编写 WinAPI 应用程序时是否应该使用匈牙利表示法? [关闭]

    Closed 这个问题是基于意见的 help closed questions 目前不接受答案 我最近开始学习 Win32 API 我讨厌匈牙利表示法 变量名中那些愚蠢的前缀 使代码看起来丑陋且几乎不可读 但是正如您可能知道的那样 它绝对无
  • C++中make_shared和普通shared_ptr的区别

    std shared ptr p1 std make shared foo std shared ptr p2 new Object foo 许多谷歌和 stackoverflow 帖子都在这方面 但我无法理解为什么make shared比
  • 在 C 中将可变长度数组声明为全局变量

    如何将可变长度数组声明为全局变量 当在扫描长度之前在函数中声明可变长度数组时 它会编译但不会运行 它给出了分段错误 当相同的声明语句移动到扫描语句下方时 它运行良好 如果我们想要一个全局可用于所有函数的可变长度数组 我们该怎么做呢 这里的问
  • 将 XAML PathGeometry 转换为 WPF PathGeometry

    我想要由 LineSegment 组成的 PathGeometry 所以 我使用第一个代码 但它是错误的 PathGeometry temp PathGeometry Geometry Parse
  • 使用 textFileStream 的 Python Spark Streaming 示例不起作用。为什么?

    我使用spark 1 3 1和Python 2 7 这是我第一次体验 Spark Streaming 我尝试使用 Spark Streaming 从文件中读取数据的代码示例 这是示例的链接 https github com apache s