Spark Streaming:如何在 Python 中获取已处理文件的文件名

2024-04-23

我对 Spark(老实说也是 Python)有点菜鸟,所以如果我错过了一些明显的东西,请原谅我。

我正在使用 Spark 和 Python 进行文件流处理。在我做的第一个示例中,Spark 正确地侦听给定目录并计算文件中单词的出现次数,因此我知道在侦听目录方面一切正常。

现在我试图获取出于审计目的而处理的文件的名称。我在这里读到http://mail-archives.us.apache.org/mod_mbox/spark-user/201504.mbox/%3CCANvf[电子邮件受保护]%3E http://mail-archives.us.apache.org/mod_mbox/spark-user/201504.mbox/%3CCANvfmP8OC9jrpVgWsRWfqjMxeYd6sE6EojfdyFy_GaJ3BO43_A@mail.gmail.com%3E这不是一件小事。我在这里得到了一个可能的解决方案http://mail-archives.us.apache.org/mod_mbox/spark-user/201502.mbox/%3CCAEgy[电子邮件受保护]%3E http://mail-archives.us.apache.org/mod_mbox/spark-user/201502.mbox/%3CCAEgyCiZbnrd6Y_aG0cBRCVC1u37X8FERSEcHB=tR3A2VGrGrPQ@mail.gmail.com%3E我尝试按如下方式实现它:

from __future__ import print_function

import sys

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

def fileName(data):
    string = data.toDebugString

if __name__ == "__main__":
    sc = SparkContext(appName="PythonStreamingFileNamePrinter")
    ssc = StreamingContext(sc, 1)
    lines = ssc.textFileStream("file:///test/input/")
    files = lines.foreachRDD(fileName)
    print(files)
    ssc.start()
    ssc.awaitTermination()

不幸的是,现在它不再每秒监听该文件夹,而是监听一次,输出“None”,然后什么也不做。这与有效代码之间的唯一区别是

files = lines.foreachRDD(fileName)

在我担心获取文件名(明天的问题)之前,有人能明白为什么这只检查目录一次吗?

提前致谢 中号


所以这是一个菜鸟错误。我将我的解决方案发布出来,供我自己和其他人参考。

正如@user3689574 所指出的,我没有在函数中返回调试字符串。这充分解释了为什么我得到“无”。

接下来,我在函数外部打印调试信息,这意味着它从来不是 foreachRDD 的一部分。将其移动到函数中,如下所示:

def fileName(data):
    debug = data.toDebugString()
    print(debug)

这会按应有的方式打印调试信息,并按应有的方式继续侦听目录。改变它解决了我最初的问题。就获取文件名而言,这变得非常简单。

目录没有变化时的调试字符串如下:

(0) MapPartitionsRDD[1] at textFileStream at NativeMethodAccessorImpl.java:-2 [] | UnionRDD[0] at textFileStream at NativeMethodAccessorImpl.java:-2 []

这清楚地表明没有文件。当文件复制到目录中时,调试输出如下:

(1) MapPartitionsRDD[42] at textFileStream at NativeMethodAccessorImpl.java:-2 [] | UnionRDD[41] at testFileStream at NativeMethodAccessorImpl.java:-2 [] | file:/test/input/test.txt New HadoopRDD[40] at textFileStream at NativeMethodAccessorImpl.java:-2 []

通过快速的正则表达式,可以轻松地为您提供文件名。希望这对其他人有帮助。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark Streaming:如何在 Python 中获取已处理文件的文件名 的相关文章

随机推荐

  • WebApi 强制操作返回 xml

    我有这个动作 public IHttpActionResult SearchFor int aboItemType DTO FilterColumns filter Do stuff return Ok
  • 单页中可以有多个 html、head 和 body 元素吗

    我有多个页面被合并到一个页面中 其中一些单独的页面有自己的 html head 和 body 元素 拥有这些会对页面的性能产生不利影响吗 FireBug 中的 DOM 似乎是正确的 每个元素只有一个 第一 不要这样做 浏览器是very如果涉
  • 闪亮的 R 操作按钮控制反应元素

    不确定我是否应该使用这个术语 基本上 我有一个反应函数 可以显示用户上传的 CSV 文件 我想使用action button触发情节生成过程 此时此刻 情节总是即时生成的 所以我想知道 在renderPlot函数 如何让action but
  • 基于类的通用 UpdateView 内联

    我有以下型号 class Cv models Model name models CharField name max length 250 objective models CharField objective max length 2
  • 删除 Windows 窗体中的标题栏

    如何删除窗口窗体顶部的蓝色边框 我不知道它的确切名称 您可以设置属性FormBorderStyle对于设计师中的任何一个人来说 或者在代码中 this FormBorderStyle System Windows Forms FormBor
  • 在 Windows 上通过 pip 使用 fastmath(gmp 或 mpir)构建 PyCrypto

    我通过 pip 在 Windows 上安装了 PyCrypto 但无法构建 Crypto PublicKey fastmath 因为找不到 GMP 我知道有一个二进制版本虚空 http www voidspace org uk python
  • 如何在调度代码时自动选择R中googlesheets4中的预授权帐户?

    我试图弄清楚自动允许 googlesheet4 包选择我的预授权帐户来下载特定谷歌表格的方法是什么 例如 我想每天运行以下一次 library googlesheets4 delta lt read sheet https docs goo
  • 找出这样一座塔中尽可能多的人

    首先我们看一下问题 马戏团正在设计一种塔式表演 由人们站在彼此的塔顶上组成 肩膀 出于实用和美观的原因 每个人都必须比他或她下面的人矮且轻 给定马戏团中每个人的身高和体重 编写一个方法来计算最大可能的人数 在这样的一座塔里 EXAMPLE
  • React.lazy() 与 Typescript

    我收到错误 Element 类型中缺少属性 default 但类型中需要属性 default 默认 组件类型 ts 2322 React lazy gt import i18n locales this props lang then o
  • 使用 corona sdk 验证电子邮件地址

    在我的项目中 有一个供用户填写详细信息的表单 其中有一个文本字段用于输入用户的电子邮件 ID 所以我需要在 corona 项目中验证该文本字段中的电子邮件 试试这个正则表达式 local email email protected cdn
  • ImportError:无法从“tensorflow.python.keras.engine”导入名称“network”

    尝试使用 anaconda 环境导入在 Tensorflow 2 3 0 上运行的 tf agents environments 时出现此错误 尝试重新安装tensorflow 仍然出现同样的错误 以管理员身份运行 jupyter 笔记本
  • Maven:经常从远程存储库下载元数据 xml 文件

    我正在使用 Maven 来处理 Java 项目 我认为只有在第一次编译时才需要互联网连接来从远程存储库下载所需的库 但每当我编译代码时 我都会收到几条下载消息 像这样的消息 Downloading http repo maven apach
  • CSS:将“float:right”元素移动到顶部(与列表的第一个元素对齐)

    我有一系列元素 最后一个元素有 css float left 我想将其显示在与第一个元素相同的高度 而不是显示在列表的底部 我无法更改 html 代码 因此它是列表中的最后一个 同时 我想将其保留在右侧 我怎样才能用CSS制作它 thank
  • JPA 枚举 ORDINAL 与 STRING

    可以使用以下任一方式在 JPA 中定义枚举 Enumerated EnumType ORDINAL or Enumerated EnumType STRING 我想知道这两个定义的优点和缺点是什么 我听说 ORDINAL 在 Eclipse
  • 为 AlertDialog 的进入和退出设置动画

    我必须滑入AlertDialog当它进入时 当它消失时 将其滑出 但它不是动画 那么如何让动画发挥作用呢 这是我所拥有的 public class SlideDialogFragment extends DialogFragment Ove
  • 将文件读入多维数组

    我想从文件中读取数字 n n 网格并将它们复制到多维数组中 一次一个 int 我有读取文件并将其打印出来的代码 但不知道如何获取每个 int 我认为我需要 splitstring 方法和空白分隔符 才能获取每个字符 但在那之后我不确定 我还
  • 创建子列表并从上一个列表中删除值

    我想在 Java 中创建一个子列表 并从上一个列表中删除子列表中的值 我的程序正确创建了子列表 但它没有从前一个列表中删除正确的值 My code for int i 0 i lt 4 i List sub new ArrayList pr
  • 在 django admin 中更改字段名称

    我正在自定义 django admin 我想更改字段的显示名称 我认为答案是here https docs djangoproject com en dev ref contrib admin 但我找不到它 感谢 Meta 类 我已经更改了
  • 同一解决方案中的 ASP.NET Core 5 MVC/Razor Pages 和 Web API 项目

    许多网站分为两部分 www example com 公众MVC https learn microsoft com en us aspnet core mvc overview view aspnetcore 5 0 剃刀页面 https
  • Spark Streaming:如何在 Python 中获取已处理文件的文件名

    我对 Spark 老实说也是 Python 有点菜鸟 所以如果我错过了一些明显的东西 请原谅我 我正在使用 Spark 和 Python 进行文件流处理 在我做的第一个示例中 Spark 正确地侦听给定目录并计算文件中单词的出现次数 因此我