Spark Streaming 无法从单个文件读取流数据

2024-02-05

我正在尝试从使用 Spark 流 API“textFileStream”连续附加的文本文件中读取流数据。但无法使用 Spark Streaming 读取连续数据。 Spark中如何实现呢?


这是预期的行为。为了基于文件的源 https://spark.apache.org/docs/latest/streaming-programming-guide.html#basic-sources (like fileStream):

  • 必须通过原子地将文件移动或重命名到数据目录中来在数据目录中创建文件。
  • 文件一旦移动,就不得更改。因此,如果不断追加文件,则不会读取新数据。

如果您想连续读取附加内容,则必须创建自己的源,或使用单独的进程,该进程将监视更改,并将记录推送到例如 Kafka(尽管很少将 Spark 与支持附加的文件系统结合起来)。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark Streaming 无法从单个文件读取流数据 的相关文章

  • 在Python Spark中查看RDD内容?

    在 pyspark 中运行一个简单的应用程序 f sc textFile README md wc f flatMap lambda x x split map lambda x x 1 reduceByKey add 我想使用 forea
  • Spark - java.lang.OutOfMemoryError:请求的数组大小超出 VM 限制

    我正在尝试对 Cloudera 的 Spark 2 1 0 中的数据帧进行 groupBy 操作 该集群位于总 RAM 约为 512GB 的 7 节点集群上 我的代码如下 ndf ndf repartition 20000 by user
  • Spark 上的 Hive 2.1.1 - 我应该使用哪个版本的 Spark

    我在跑蜂巢2 1 1 Ubuntu 16 04 上的 hadoop 2 7 3 根据Hive on Spark 入门 https cwiki apache org confluence display Hive Hive on Spark
  • Spark Worker 在 Heartbeater 中与 Spark Driver 通信的超时时间为 3600 秒

    我没有配置任何超时值 而是使用默认设置 在哪里配置3600秒超时 怎么解决呢 错误信息 18 01 10 13 51 44 WARN Executor Issue communicating with driver in heartbeat
  • 将案例类传递给函数参数

    抱歉问了一个简单的问题 我想将案例类传递给函数参数 并且想在函数内部进一步使用它 到目前为止我已经尝试过这个TypeTag and ClassTag但由于某种原因 我无法正确使用它 或者可能是我没有看到正确的位置 用例与此类似 case c
  • 在 Spark-submit 上的 _find_and_load 中获取文件“”,第 991 行

    我目前使用的是Python 3 7 9 spark spark 2 4 6 bin hadoop2 6 在这个项目 venv 中 我的设置为 kafka python 2 0 2 pip 21 2 4 py4j 0 10 9 pyspark
  • Python Spark DataFrame:用 SparseVector 替换 null

    在 Spark 中 我有以下名为 df 的数据框 其中包含一些空条目 id features1 features2 185 5 0 1 4 0 1 0 null 220 5 0 2 3 0 1 0 10 1 2 6 0 1 225 null
  • 如何过滤 pyspark 列表中值的列?

    我有一个数据框原始数据 我必须在 X 列上应用值 CB CI 和 CR 的过滤条件 所以我使用了下面的代码 df dfRawData filter col X between CB CI CR 但我收到以下错误 Between 恰好需要 3
  • Spark:并行转换多个数据帧

    了解如何在并行转换多个数据帧时实现最佳并行性 我有一系列路径 val paths Array path1 path2 我从每个路径加载数据帧 然后转换并写入目标路径 paths foreach path gt val df spark re
  • 列对象不可调用 Spark

    我尝试安装 Spark 并运行教程中给出的命令 但出现以下错误 https spark apache org docs latest quick start html https spark apache org docs latest q
  • Kubernetes WatchConnectionManager:执行失败:HTTP 403

    我遇到错误Expected HTTP 101 response but was 403 Forbidden 在我使用以下命令设置新的 Kubernetes 集群之后Kubeadm当我提交下面遇到的 pyspark 示例应用程序时 只有一个主
  • 如何从spark管道逻辑模型中提取变量权重?

    我目前正在尝试学习 Spark Pipeline Spark 1 6 0 我将数据集 训练和测试 导入为 oas sql DataFrame 对象 执行以下代码后 生成的模型是oas ml tuning CrossValidatorMode
  • Spark:出现心跳错误后丢失数据

    我有一个在 Spark 集群上运行的 Python 程序 有四个工作线程 它处理一个包含大约 1500 万条记录的巨大 Oracle 表 检查结果后发现大约有600万条记录没有插入 我的写入功能如下 df write format jdbc
  • 过滤字符串上的 Spark DataFrame 包含

    我在用火花1 3 0 http spark apache org releases spark release 1 3 0 html and 火花阿夫罗1 0 0 https github com databricks spark avro
  • Spark日期格式问题

    我在火花日期格式中观察到奇怪的行为 实际上我需要转换日期yy to yyyy 日期转换后 日期应为 20yy 我尝试过如下 2040年后失败 import org apache spark sql functions val df Seq
  • 如何使用 PySpark 预处理图像?

    我有一个项目 需要为 1 设置大数据架构 AWS S3 SageMaker 的概念验证使用 PySpark 预处理图像 2 执行 PCA and 3 训练一些机器或深度学习模型 我的问题是了解如何使用 PySpark 操作图像数据 但无法在
  • 更改 Spark SQL 中的 Null 顺序

    我需要能够按升序和降序对列进行排序 并且还允许空值位于第一个或空值位于最后一个 使用 RDD 我可以将 sortByKey 方法与自定义比较器结合使用 我想知道是否有使用 Dataset API 的相应方法 我了解如何将 desc asc
  • HashPartitioner 是如何工作的?

    我阅读了文档HashPartitioner http spark apache org docs 1 3 1 api java index html org apache spark HashPartitioner html 不幸的是 除了
  • 在 Spark MLlib 上使用 Java 中的 Breeze

    在尝试从Java使用MLlib时 使用微风矩阵运算的正确方法是什么 例如scala 中的乘法很简单 matrix vector 相应的功能在Java中是如何表达的 有一些方法 例如 colon times 可以通过正确的方式调用 breez
  • 如何将 Pyspark Dataframe 标题设置到另一行?

    我有一个如下所示的数据框 col1 col2 col3 id name val 1 a01 X 2 a02 Y 我需要从中创建一个新的数据框 使用 row 1 作为新的列标题并忽略或删除 col1 col2 等行 新表应如下所示 id na

随机推荐

  • Javascript匿名函数不更新全局变量

    我在一些代码中进行了 getJSON 调用 该调用似乎没有更新全局变量 但我不明白为什么 JSON 数据加载正常 但由于某种原因 全局 EventOptions 数组未在 for 循环中更新 大写的注释指的是变量 有任何想法吗 谢谢 fun
  • 无法在外键上创建表(错误号:150)

    我看到很多同样的问题 但我无法解决我的问题 如果我运行这段代码
  • 如何设置 thymeleaf th:来自其他变量的字段值

    我有一个简单的文本输入字段 我必须在其中设置一个对象的默认值并将其最终值保存在其他对象中 以下代码不起作用 div div
  • 简单的分布式 Erlang

    我有一个简单的模块 module dist compile add 3 add From X Y gt From X Y 我正在启动两个节点 一与 erl sname foo 另一个与 erl sname bar 在我正在做的酒吧节点上 g
  • 速度较慢的 numpy.argmax/argmin 的更快替代方案

    我正在使用很多argmin and argmax在Python中 不幸的是 该功能非常慢 我已经做了一些搜索 我能找到的最好的就在这里 http lemire me blog archives 2008 12 17 fast argmax
  • Firefox 4中的Greasemonkey脚本,想更改网页上的一行代码

    好吧 我会尽量让这件事变得尽可能简单 但不会太含糊 我想更改网页上的一行代码 以防止出现预览窗格 原始代码行 div class previewpane 我想将上面的行替换为 div class previewpane previewpan
  • 文件加密与内容加密不同吗

    加密文件和加密文件内容之间有什么区别吗 如果是这样 两者该如何做 文件加密是从外部加密整个文件 例如通过在 Windows XP 资源管理器中右键单击 内容加密通常被用作文件加密的同义词 但两者是不同的 内容加密是指对文件的内容进行加密 或
  • Pandas 合并错误:MemoryError

    Problem 我正在尝试将两个相对较小的数据集放在一起 但合并会引发MemoryError 我有两个国家贸易数据汇总数据集 我试图将其合并到关键年份和国家 地区 因此需要对数据进行特殊设置 不幸的是 这使得使用concat正如这个问题的答
  • 理解非齐次 numpy 数组

    我最近开始使用 numpy 并注意到一件奇怪的事情 import numpy as np a np array 1 2 3 4 5 9 8 print a shape shape print a 1 0 在这种情况下 形状是2L 但是 如果
  • 从 awk 调用 python 脚本

    大多数解决方案都从 python 调用 awk 但我想反过来做 我有一个从文件中提取信息的 python 脚本 然而 所述文件名在 awk 脚本的列中引用 如何向 python 传递参数 s20s 文件名并从标准输出获取输入 我想将输出添加
  • 填充嵌入的 UITableViewController

    我有一个使用导航控制器的应用程序 在其中一个视图中 我有一个视图容器 其中嵌入了使用静态单元格的 UITableViewController 我需要使用从上一个视图传入的数据来填充此表视图控制器的标签 流程如下 View1 gt segue
  • 用整数哈希替换字符串名称有哪些好方法

    通常 数据驱动设计中的实体和组件或游戏代码的其他部分都会有名称 如果您想准确地找出正在处理的对象 则需要检查这些名称 void Player Interact Entity myEntity if myEntity gt isNearEno
  • formGroup 需要一个 FormGroup 实例。请传一份

    情况 我正在尝试在我的 Ionic 2 应用程序中制作一个非常简单的登录表单 无论我尝试什么 我都会不断收到此错误 formGroup expects a FormGroup instance Please pass one in 代码 页
  • 使用命令行参数从 python 制作 exe 文件

    我想从使用命令行参数 argv 的 python 脚本创建一个 exe据我所知 py2exe 不支持命令行参数 我能做些什么 编辑 我使用的是 GUI2Exe 工具 所以我错过了控制台标志 但接受的答案是完全正确的 setup consol
  • PHP 正斜杠匹配

    PHP 中如何检查某个字符串是否包含正斜杠 检查是否出现strpos http php net manual en function strpos php if strpos string FALSE Found 以整数形式返回位置 如果未
  • 如何读取 git-ls-tree 输出的模式字段

    git ls tree fb3a8bdd0ce 100644 blob 63c918c667fa005ff12ad89437f2fdc80926e21c gitignore 100644 blob 5529b198e8d14decbe4ad
  • CSS 中类的通配符 *

    我有这些我正在设计的 div tocolor 但我还需要唯一标识符 1 2 3 4 等 因此我将其添加为另一个类tocolor 1 div class tocolor tocolor 1 tocolor 1 div div class to
  • 使用 C++ WinAPI 在 Windows 10 上设置亮度

    我正在尝试在 Windows 10 计算机上设置亮度 显示器好像不支持setMonitorBrightness and setDeviceGammaRamp改变伽玛 白点等 所以我尽量不使用它 我正在尝试使用它来使其工作IOCTL VIDE
  • SendGrid API 动态 - 无法取消订阅工作

    我刚刚使用 sendgrid 设置了一封动态电子邮件 并使用 API 根据我的客户数据填写它 除了取消订阅部分之外 我已经完成了所有工作 我的电子邮件底部有一个取消订阅块 在代码编辑器中 代码如下所示 div class module st
  • Spark Streaming 无法从单个文件读取流数据

    我正在尝试从使用 Spark 流 API textFileStream 连续附加的文本文件中读取流数据 但无法使用 Spark Streaming 读取连续数据 Spark中如何实现呢 这是预期的行为 为了基于文件的源 https spar