在spark中连接mongodb时出现异常

2024-02-28

在尝试使用 MongoDB 作为输入 RDD 时,我在 org.bson.BasicBSONDecoder._decode 中收到“java.lang.IllegalStateException:未就绪”:

Configuration conf = new Configuration();
conf.set("mongo.input.uri", "mongodb://127.0.0.1:27017/test.input");

JavaPairRDD<Object, BSONObject> rdd = sc.newAPIHadoopRDD(conf, MongoInputFormat.class, Object.class, BSONObject.class);

System.out.println(rdd.count());

我得到的例外是: 2006 年 8 月 14 日 09:49:57 信息 rdd.NewHadoopRDD:输入分割:

MongoInputSplit{URI=mongodb://127.0.0.1:27017/test.input, authURI=null, min={ "_id" : { "$oid" : "53df98d7e4b0a67992b31f8d"}}, max={ "_id" : { "$oid" : "53df98d7e4b0a67992b331b8"}}, query={ }, sort={ }, fields={ }, notimeout=false} 14/08/06 09:49:57 
WARN scheduler.TaskSetManager: Loss was due to java.lang.IllegalStateException 
java.lang.IllegalStateException: not ready
            at org.bson.BasicBSONDecoder._decode(BasicBSONDecoder.java:139)
            at org.bson.BasicBSONDecoder.decode(BasicBSONDecoder.java:123)
            at com.mongodb.hadoop.input.MongoInputSplit.readFields(MongoInputSplit.java:185)
            at org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:285)
            at org.apache.hadoop.io.ObjectWritable.readFields(ObjectWritable.java:77)
            at org.apache.spark.SerializableWritable.readObject(SerializableWritable.scala:42)
            at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
            at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:88)
            at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:55)
            at java.lang.reflect.Method.invoke(Method.java:618)
            at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1089)
            at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1962)
            at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1867)
            at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1419)
            at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2059)
            at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1984)
            at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1867)
            at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1419)
            at java.io.ObjectInputStream.readObject(ObjectInputStream.java:420)
            at org.apache.spark.scheduler.ResultTask.readExternal(ResultTask.scala:147)
            at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1906)
            at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1865)
            at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1419)
            at java.io.ObjectInputStream.readObject(ObjectInputStream.java:420)
            at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63)
            at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:85)
            at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:165)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1156)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:626)
            at java.lang.Thread.run(Thread.java:804)

所有程序的输出是here https://jira.mongodb.org/secure/attachment/49217/exception.txt

环境:

  • Redhat
  • 火花1.0.1
  • Hadoop 2.4.1
  • MongoDB 2.4.10
  • 蒙戈-hadoop-1.3

我想我已经发现了这个问题:mongodb-hadoop 在 core/src/main/java/com/mongodb/hadoop/input/MongoInputSplit.java 中的 BSON 编码器/解码器实例上有一个“静态”修饰符。当 Spark 在多线程模式下运行时,所有线程都会尝试使用same编码器/解码器实例,预计会产生不好的结果。

补丁在我的github上here https://github.com/adq/mongo-hadoop/commit/d7db2e2a3e1bc20e3fbdbe97f01e9cf2ef5f0d10(已向上游提交了拉取请求)

我现在可以从 Python 运行 8 核多线程 Spark->mongo 集合 count()!

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

在spark中连接mongodb时出现异常 的相关文章

  • 如何在 Spark 数据帧 groupBy 中执行 count(*)

    我的目的是做相当于基本sql的事情 select shipgrp shipstatus count cnt from shipstatus group by shipgrp shipstatus 我见过的 Spark 数据帧的示例包括其他列
  • 为关联数组选择哪种映射类型?学说ODM

    我有一个关于 顺便说一句 真的很棒 Doctrine ODM 的简单问题 假设您有一个类似以下的文档 Document class Test Id public id WHICHTYPE public field array 现在我想存储一
  • MongoDB 中两个集合之间的 Diff()

    我做过研究 如果这是一个重复的问题 我很抱歉 但其他问题的解决方案并不适合我 因此 我提出了一个新问题 使用 Javascript 比较两个集合的最佳方法是什么 我有数千个这样的 Mongo 文档格式的标头 url google com h
  • Hadoop-reducer 如何获取数据?

    据我所知 映射器为每个减速器生成 1 个分区 减速器如何知道要复制哪个分区 假设有 2 个节点运行用于字数统计程序的映射器 并且配置了 2 个缩减器 如果每个映射节点生成 2 个分区 并且两个节点中的分区都可能包含相同的单词作为键 那么减速
  • MongoDB 如何选择候选计划

    我的应用程序中的查询速度很慢 创建两个索引后 它在本地数据库中使用它们以获得更好的性能 但是当我部署在生产数据库上时 它仍然使用原始索引 下面是我所做的 集合中的属性tasks team id project id created by a
  • MongoDB insertMany 并跳过重复项

    我试图insertMany https docs mongodb com manual reference method db collection insertMany 项目进入我的 Mongo 数据库 但我想跳过重复的 ID 我在用着N
  • 返回吃异常

    我至少发现了以下行为weird def errors try ErrorErrorError finally return 10 print errors prints 10 It should raise NameError name E
  • 从异常中提取类和文件名

    是否可以从异常对象中提取类名和文件名 我希望将更好的日志记录集成到我的应用程序中 并且我想包含异常发生位置的详细信息 在 MVC 中 Stacktrace 不返回文件名和类名 我有点不知道在哪里寻找它们 Thanks 您可以创建一个Stac
  • 将 Mongodb 与 Android 应用程序连接

    我正在尝试构建 Android 应用程序来连接到 MongoDB 一直被这个问题困扰 MongoDB 是可访问的 但没有安全性 可以通过手机使用 Mono Explorer 添加数据 public void sendMessage View
  • MongoDB:仅获取过去 24 小时内创建的文档?

    我想限制我所做的查询仅查看过去 24 小时内创建的文档 构造此查询的最佳方式是什么 如何根据日期进行限制 Add createdAt字段 索引它 然后查询 db getCollection COLLECTION NAME find crea
  • 如何在 Spring MongoDB 聚合上投影 DBRef?

    我在 MongoDB shell 中完成了以下聚合 以获取每个用户每种类型的警报数量 db getCollection alerts aggregate unwind son group id son son level level cou
  • 如何从 JSON 创建 Mongoose 模式

    我是 mongodb nodejs 和 mongooseJS 的新手 最近 我一直在尝试为我的 JSON 创建猫鼬模式 endpoints a z poi location name a latitude 10 1075702 longit
  • Django:无法为用于检索数据的模型实例化抽象模型

    我正在开发一个项目 该项目有一个 Djongo 抽象模型和一个主模型 当我尝试插入一个值时 它被插入而没有错误 但是当我尝试检索数据时 我得到 抽象模型无法实例化 这是我的模型 class Exam questions models Mod
  • 使用 python 字典更新 MongoEngine 文档?

    是否可以使用 python 字典更新 MongoEngine 文档 例如 class Pets EmbeddedDocument name StringField class Person Document name StringField
  • Spark Dataframe 中的分析

    在这个问题中 我们有两个经理 M1 和 M2 在经理 M1 的团队中有两个员工 e1 和 e2 在 M2 的团队中有两个员工 e4 和 e5 以下是经理和员工的层次结构 1 M1 a e1 b e2 2 M2 a e4 b e5 我们有以下
  • 有没有办法防止 Visual Studio 因特定方法中的异常而中断?

    我知道我可以根据异常的类型以及最终使用 异常 对话框捕获异常的事实来控制 Visual Studio 处理异常的方式 但是 我有一个在内部抛出 并捕获 一个库ArgumentOutOfRange当我调用特定方法时出现异常 抛出异常 并被库捕
  • 当 JMS Prod 位于辅助 POJO 类中时,如何在事务中包含 JMS Producer

    简短的问题 有没有办法强制无状态 EJB 调用的 POJO 存在于 EJB 的上下文中 以便事务和资源注入可以在 POJO 中工作 具体来说 在我想要做的事情的上下文中 如何在 EJB 的事务中包含 POJO JMS 生产者 该生产者在调用
  • Pyspark - 一次聚合数据帧的所有列[重复]

    这个问题在这里已经有答案了 我想将数据框分组到单个列上 然后对所有列应用聚合函数 例如 我有一个包含 10 列的 df 我希望对第一列 1 进行分组 然后对所有剩余列 均为数字 应用聚合函数 sum 与此等效的 R 是 summarise
  • 为什么我的析构函数中的异常没有触发 std::terminate?

    我很清楚不应该在析构函数中抛出任何异常这一事实 但作为掌握这个概念的一部分 我编写了这个示例 include
  • 如何避免连续“重置偏移量”和“寻找最新偏移量”?

    我正在尝试遵循本指南 https spark apache org docs latest structed streaming kafka integration html https spark apache org docs late

随机推荐

  • Linux 和 Windows 上的 Python sys.maxint、sys.maxunicode

    在 64 位 Debian Linux 6 上 Python 2 6 6 r266 84292 Dec 26 2010 22 31 48 GCC 4 4 5 on linux2 Type help copyright credits or
  • 如何判断用户是否选择了浅色主题或深色主题

    有没有办法判断用户是否选择了浅色主题或深色主题 Thanks 有一个属性可以对此进行测试 而不是比较实际的资源颜色 Visibility v Visibility Resources PhoneLightThemeVisibility if
  • 函数 try 块是否允许我们解决异常?

    所以我正在阅读有关函数 try 块的内容link https www learncpp com cpp tutorial 14 7 function try blocks 还有一行描述了普通 try 块和函数 try 块之间的区别 如下所示
  • 一次性登录应用程序 - FirebaseAuth

    我正在开发一个使用 Firebase 身份验证通过电话号码登录用户的应用程序 我想添加一项功能 使用户只能一次性登录 即即使用户杀死应用程序并再次启动它 他也应该登录 另外 我不想添加注销功能 为此可以做什么 实现此目的的最简单方法是使用侦
  • java LocalDateTime 解析具有模式 MM/d/yyyy HH:mm:ss a 的日期的异常

    我在使用 java datetime API 执行下面的代码时遇到异常 String strDate 12 4 2018 5 26 28 PM DateTimeFormatter formatter DateTimeFormatter of
  • Spring 3 表达式语言如何与属性占位符交互?

    Spring 3 引入了新的表达语言 http docs spring io spring docs 3 0 x spring framework reference html expressions html SpEL 可以在 bean
  • 如何检测产品搜索中的拼写错误并提出可能的更正建议?

    给定一个非常大的产品名称数据库 您如何检测用户搜索中可能存在的拼写错误并建议可能的更正 有点像谷歌呈现它们的方式 E g 用户输入 fork handels 并按 搜索 他们回来了 没有结果 您是说 叉柄 吗 解决这个问题有几种方法 保留一
  • G++ 找不到 boost 库。我说他们就在众目睽睽之下

    我正在尝试构建一些代码 这是我收到的错误 main o In function static initialization and destruction 0 home jmbeck Downloads boost 1 48 0 boost
  • Visual Studio 2013 缺少转换为 Web 应用程序

    我正在管理一个旧的 Web 应用程序 它仍然具有 Framework 1 的遗留代码 你相信吗 目前使用 Framework 4 0 当我需要修复或升级网页时 我通过单击 转换为 Web 应用程序 将其转换为 Web 应用程序网页 VS 2
  • order Graphviz - 固定子图

    我喜欢创建一个像这样的 3 柱形图 Code digraph g rankdir LR node shape circle fontsize 14 fontsize 18 labeljust l rank same edge style i
  • Docker 容器超时?

    对于我在大学的论文 我正在研究一个编码排行榜系统 用户可以通过临时 Docker 容器编译 运行不受信任的代码 到目前为止 系统似乎运行良好 但我面临的一个问题是 当提交无限循环的代码时 例如 while True print infini
  • Twitter 回调 URL

    我正在使用 twitter SDK 与 ios 应用程序集成 当我遵循所有步骤并创建客户密钥并将其放入我的应用程序中时 当我运行应用程序时 它会显示 TwitterKit 确实遇到了消息错误 获取用户身份验证令牌时出错 错误域 TWTRLo
  • 如何在我自己的 C shell 中正确等待前台/后台进程?

    In this https stackoverflow com questions 873620 how do i clear this array pointer in c上一个问题我发布了大部分我自己的 shell 代码 我的下一步是实
  • 从子查询更新多列

    此类问题之前已被问过几次 但并不完全是我想要的 我需要SET两行等于子查询的不同部分 我目前正在使用 UPDATE records SET leads SELECT COUNT FROM leads table WHERE leads ta
  • 替换字符串中的字符列表

    我有一个字符串 它是网页的标题 所以它可以有 和其他特殊字符 我想编写一个函数 它将接受一个字符串并替换一个字符列表 试图找到最好的方法来做到这一点 我应该使用列表 数组或枚举来保存特殊字符列表 还是 java 中已经有一些东西可以做到这一
  • 如何使单选按钮看起来像切换按钮

    我希望一组单选按钮看起来像一组切换按钮 但功能仍然像单选按钮一样 它们不必看起来完全像切换按钮 我怎样才能只使用 CSS 和 HTML 来做到这一点 编辑 当选中 取消选中按钮时 我会满意地使小圆圈消失并更改样式 根据您想要支持的浏览器 您
  • 具有取消支持的 GetContextAsync()

    所以我正在旋转一个HttpListener等待一个OAuth2回复 在理想的情况下 当用户登录浏览器并且我们收到令牌时 该令牌只会存活几秒钟 我也希望这个有一个CancellationToken以便用户可以在延迟后停止收听 如果他们愿意 我
  • 迭代器块的测试覆盖率结果很奇怪,为什么这些语句没有执行?

    我正在使用 dotCover 来分析单元测试的代码覆盖率 并且得到一些奇怪的结果 我有一个迭代器方法 其覆盖范围不完整 但未覆盖的语句只是右大括号在该方法的末尾 这是我正在测试的方法 public static IEnumerable
  • 尝试将二进制文件部署到已存储不同二进制文件的位置

    当我从 tridio 2009 发布页面时 出现以下错误 Destination with name FTP Host servername Location RET Password Port 21 UserName retftp rep
  • 在spark中连接mongodb时出现异常

    在尝试使用 MongoDB 作为输入 RDD 时 我在 org bson BasicBSONDecoder decode 中收到 java lang IllegalStateException 未就绪 Configuration conf