avro 类型的 createDataFrame 中的无限递归

2024-02-01

在此示例中,我从 createDataFrame 调用内部收到 StackOverflowError 。它起源于涉及 java 类型推断的 scala 代码,该代码在无限循环中调用自身。

final EventParser parser = new EventParser();
JavaRDD<Event> eventRDD = sc.textFile(path)
        .map(new Function<String, Event>()
{
    public Event call(String line) throws Exception
    {
        Event event = parser.parse(line);
        log.info("event: "+event.toString());
        return event;
    }
});
log.info("eventRDD:" + eventRDD.toDebugString());

DataFrame df = sqlContext.createDataFrame(eventRDD, Event.class);
df.show();

堆栈跟踪的底部如下所示:

at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
at org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$inferDataType(JavaTypeInference.scala:102)
at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$2.apply(JavaTypeInference.scala:104)
at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$2.apply(JavaTypeInference.scala:102)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)

这看起来与中报告的错误类似http://apache-spark-developers-list.1001551.n3.nabble.com/Stackoverflow-in-createDataFrame-td11791.html http://apache-spark-developers-list.1001551.n3.nabble.com/Stackoverflow-in-createDataFrame-td11791.html但我使用的是 Spark 1.4.1,它比修复此错误时要晚。

Event 类由 avro 从该 avsc 生成。它确实包含 double 和 long 字段,据报道这会导致问题,但用字符串替换 double 不会改变症状。

{
    "namespace": "mynamespace", 
    "type": "record", 
    "name": "Event", 
    "fields": [
        { "name": "ts", "type": "double", "doc": "Timestamp"},
        { "name": "uid", "type": "string", "doc": "Unique ID of Connection"},
        { "name": "idorigh", "type": "string", "doc": "Originating endpoint’s IP address (AKA ORIG)"},
        { "name": "idorigp", "type": "int", "doc": "Originating endpoint’s TCP/UDP port (or ICMP code)"},
        { "name": "idresph", "type": "string", "doc": "Responding endpoint’s IP address (AKA RESP)"},
        { "name": "idrespp", "type": "int", "doc": "Responding endpoint’s TCP/UDP port (or ICMP code)"},
        { "name": "proto", "type": "string", "doc": "Transport layer protocol of connection"},
        { "name": "service", "type": "string", "doc": "Dynamically detected application protocol, if any"},
        { "name": "duration", "type": "double", "doc": "Time of last packet seen – time of first packet seen"},
        { "name": "origbytes", "type": "int", "doc": "Originator payload bytes; from sequence numbers if TCP"},
        { "name": "respbytes", "type": "int", "doc": "Responder payload bytes; from sequence numbers if TCP"},
        { "name": "connstate", "type": "string", "doc": "Connection state (see conn.log:conn_state table)"},
        { "name": "localorig", "type": "boolean", "doc": "If conn originated locally T; if remotely F."},
        { "name": "localresp", "type": "boolean", "doc": "empty, always unset"},
        { "name": "missedbytes", "type": "int", "doc": "Number of missing bytes in content gaps"},
        { "name": "history", "type": "string", "doc": "Connection state history (see conn.log:history table)"},
        { "name": "origpkts", "type": [ "int", "null"], "doc": "Number of ORIG packets"},
        { "name": "origipbytes", "type": [ "int", "null"], "doc": "Number of RESP IP bytes (via IP total_length header field)"},
        { "name": "resppkts", "type": [ "int", "null"], "doc": "Number of RESP packets"},
        { "name": "respipbytes", "type": [ "int", "null"], "doc": "Number of RESP IP bytes (via IP total_length header field)"},
        { "name": "tunnelparents", "type": [ "string", "null"], "doc": "If tunneled, connection UID of encapsulating parent (s)"},
        { "name": "origcc", "type": ["string", "null"], "doc": "ORIG GeoIP Country Code"},
        { "name": "respcc", "type": ["string", "null"], "doc": "RESP GeoIP Country Code"}
    ]
}

有人可以建议吗?谢谢!


Spark-avro 项目正在开展工作来解决此问题,请参阅:https://github.com/databricks/spark-avro/pull/217 https://github.com/databricks/spark-avro/pull/217 and https://github.com/databricks/spark-avro/pull/216 https://github.com/databricks/spark-avro/pull/216

一旦合并,应该有一个函数将 Avro 对象的 RDD 转换为 DataSet(行的 DataSet 相当于 DataFrame),而不会出现生成类中 getSchema() 函数的循环引用问题。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

avro 类型的 createDataFrame 中的无限递归 的相关文章

随机推荐

  • Docker compose postgresql 服务 - 在构建过程中无法创建用户和数据库?

    我已经在这上面浪费了一整天的时间 并且说我对本应简单的任务的不必要的复杂性没有留下深刻的印象 这是一种严重的轻描淡写 好吧 说完之后 我正在使用 docker machine docker compose postgresql 和 redi
  • 如何在 Oracle 中转义 regexp_replace?

    我正在为字符串创建一个小的replaceParam函数 并且希望能够转义替换 例如 G select regexp replace ABC ABC ABC XXX from dual leads to XXX XXX 但我希望能够逃脱替换
  • 为什么 boostuniform_int_distribution 采用闭范围(而不是半开范围,遵循常见的 C++ 用法)?

    标题说明了一切 甚至还有一个warning http www boost org doc libs 1 55 0 doc html boost random tutorial html在文档页面中 警告 与常见的 C 用法相反uniform
  • 打开 AVD 管理器时出错

    在模拟器上运行代码时 我遇到了常见问题 控制台消息是 2013 01 05 19 39 15 Doodlz Android Launch 2013 01 05 19 39 15 Doodlz adb is running normally
  • 如何将动画 GIF 写入 iOS 相机胶卷?

    如何将动画 GIF 写入 iOS 相机胶卷 我知道照片库应用程序无法播放动画 但例如我应该能够在发送电子邮件等时导入它 我试过了 UIImageWriteToSavedPhotosAlbum UIImage imageWithData se
  • 更改列表项选择的视图属性

    我有一个包含自定义行的 ListView 此自定义行具有以下 UI 元素 图像视图图像视图1 图像视图2 文本视图文本视图1 文本视图2 文本视图3 要求是每当选择列表行时都会发生以下更改 imageView1背景 颜色改变 imageVi
  • 重置到 Git 中的第一个提交?

    有没有什么相当于 root标志在rebase命令为reset命令 git reset root 假设我想重置到当前分支中的第一个提交 我是否必须手动挖掘历史记录并找到该提交的哈希值 或者是否有一种简单的方法来重置到第一个可用的提交 根提交
  • Xcode 是否有更好的更新系统?

    Xcode 4 0 1 几天前发布了 这意味着我再次下载 4 5 GB 的野兽来更新 有谁知道苹果是否计划推出更好的更新系统 这些天我在等待洪流 不像下载到 80 却失去连接那么令人沮丧
  • Laravel Composer 安装出现错误“您的锁定文件不包含兼容的软件包集,请运行 Composer update”

    我编写 Laravel 代码已经有一段时间了 目前 我尝试从 github 克隆一个项目并在本地进行编辑 我在项目目录中安装了 Composer 但未包含供应商文件夹 我尝试运行composer install但我给了我这个错误 Your
  • gitlab - 使用 access_token 推送到存储库

    我实现了 oauth2 Web 流程 以便从我的应用程序的用户获取 access token 使用 access token 我想执行以下操作 获取用户信息 为该用户创建一个存储库 将代码推送到此存储库 使用 git push 我已经成功获
  • 变量中缀到前缀到后缀

    我在互联网上搜索了一个很好的实现 将变量表达式从中缀表示法转换为前缀和后缀 而不是数字表达式 我所做的所有搜索都没有成功 基本上我想看看 PHP 中是否有任何实现 这样我可以修改它以支持更多运算符 而不仅仅是 例如转换 a b c p c
  • java泛型通配符

    我对 Java 泛型类型中通配符的使用有疑问 它们之间的基本区别是什么 List
  • openCV 滤波器图像 - 用局部最大值替换内核

    关于我的问题的一些详细信息 我正在尝试在 openCV 中实现角点检测器 另一种内置算法 Canny Harris 等 我有一个充满响应值的矩阵 最大响应值为 检测到角点的最大概率为 我有一个问题 在一个点的附近检测到很少的角 但只有一个
  • 将所有列表值合并到地图中[重复]

    这个问题在这里已经有答案了 我想将地图转换为 Map
  • 注册表部分的脚本功能

    我们的软件支持一系列 70 文件关联 用户可以选择与我们的应用程序关联 直接在Registry安装程序部分并使用标志来控制卸载期间的行为以及Check标志来控制是否应将其写入注册表 用户可以通过自定义页面控制要设置的关联CheckListB
  • 如何解决Python中稀疏矩阵的“NaN或无穷大”问题?

    我对 python 完全陌生 我使用了一些在网上找到的代码 并尝试对其进行处理 因此 我正在创建一个文本文档矩阵 并且想在训练逻辑回归模型之前添加一些额外的功能 虽然我已经用 R 检查了我的数据并且没有收到错误 但是当我运行逻辑回归时 我收
  • Gstreamer 不会下沉到命名管道

    当 gst launch 管道的接收器是命名管道与普通文件时 我会得到不同的行为 我有一个 gst launch 管道 它在 OMAP 嵌入式 linux 板上显示来自摄像机的视频 并通过 Tee 以 avi 形式提供视频 gst laun
  • 如何在当前命名空间中获取Python交互式控制台?

    我想让我的 Python 代码在运行代码的过程中使用 code interact 之类的东西启动一个 Python 交互式控制台 REPL 但是 code interact 启动的控制台看不到当前命名空间中的变量 我该怎么做 mystrin
  • Python 中 case/switch 语句的等效项是什么? [复制]

    这个问题在这里已经有答案了 Python 是否有等效的switch陈述 Python 3 10 及以上版本 在 Python 3 10 中 他们引入了模式匹配 示例来自Python 文档 https docs python org 3 10
  • avro 类型的 createDataFrame 中的无限递归

    在此示例中 我从 createDataFrame 调用内部收到 StackOverflowError 它起源于涉及 java 类型推断的 scala 代码 该代码在无限循环中调用自身 final EventParser parser new