在 Apache Beam 中如何处理 Pipeline-IO 级别的异常/错误

2023-12-12

我正在使用 Spark runner 作为 Apache Beam 中的管道运行程序,并发现错误。 通过得到错误,我的问题提出了。我知道错误是由于 sql 查询中的 Column_name 不正确造成的,但我的问题是如何在 IO 级别处理错误/异常

org.apache.beam.sdk.util.UserCodeException: java.sql.SQLSyntaxErrorException: Unknown column 'FIRST_NAME' in 'field list'
at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
at org.apache.beam.sdk.io.jdbc.JdbcIO$ReadFn$DoFnInvoker.invokeProcessElement(Unknown Source)
at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:185)
at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:149)
at org.apache.beam.runners.spark.translation.DoFnRunnerWithMetrics.processElement(DoFnRunnerWithMetrics.java:70)
at org.apache.beam.runners.spark.translation.SparkProcessContext$ProcCtxtIterator.computeNext(SparkProcessContext.java:145)
at org.apache.beam.repackaged.beam_runners_spark.com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:145)
at org.apache.beam.repackaged.beam_runners_spark.com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:140)
at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:42)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:216)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1092)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1083)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1018)
18/11/01 13:13:16 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 3.0 in stage 0.0 (TID 3, localhost, executor driver): org.apache.beam.sdk.util.UserCodeException: java.sql.SQLSyntaxErrorException: Unknown column 'FIRST_NAME' in 'field list'
    at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
    at org.apache.beam.sdk.io.jdbc.JdbcIO$ReadFn$DoFnInvoker.invokeProcessElement(Unknown Source)
    at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:185)
    at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:149)
    at org.apache.beam.runners.spark.translation.DoFnRunnerWithMetrics.processElement(DoFnRunnerWithMetrics.java:70)
    at org.apache.beam.runners.spark.translation.SparkProcessContext$ProcCtxtIterator.computeNext(SparkProcessContext.java:145)
    at org.apache.beam.repackaged.beam_runners_spark.com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:145)
    at org.apache.beam.repackaged.beam_runners_spark.com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:140)
    at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:42)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    ..............
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.sql.SQLSyntaxErrorException: Unknown column 'FIRST_NAME' in 'field list'
    at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:536)
    at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:513)
    at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:115)
    at com.mysql.cj.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:1983)
    at com.mysql.cj.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:1826)
    at com.mysql.cj.jdbc.PreparedStatement.executeQuery(PreparedStatement.java:1923)
    at org.apache.commons.dbcp2.DelegatingPreparedStatement.executeQuery(DelegatingPreparedStatement.java:83)
    at org.apache.commons.dbcp2.DelegatingPreparedStatement.executeQuery(DelegatingPreparedStatement.java:83)
    at org.apache.commons.dbcp2.DelegatingPreparedStatement.executeQuery(DelegatingPreparedStatement.java:83)
    at org.apache.commons.dbcp2.DelegatingPreparedStatement.executeQuery(DelegatingPreparedStatement.java:83)
    at org.apache.beam.sdk.io.jdbc.JdbcIO$ReadFn.processElement(JdbcIO.java:601)

你必须创建一个自定义异常处理程序类来捕获该异常,例如;

需要实现这样的自定义方法

public Mycust_Exception(String string) {
    super("Error Obtained by "+string);
}

这里我刚刚返回了字符串,但也可以使用抛出super()现在你需要在你期望出现异常的地方声明 try-catch 块并遵循PTransformation_level_exceptionHandler_implementation

并在 catch 块中像这样调用 throw 语句

throw new Ezflow_Exception("Invalid statement");

这个实现肯定可以满足您的大部分查询。 对于Java编程来说,它是最常见的实现方式之一

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

在 Apache Beam 中如何处理 Pipeline-IO 级别的异常/错误 的相关文章

随机推荐

  • 什么是 JAXB?为什么要使用它? [关闭]

    Closed 这个问题是基于意见的 目前不接受答案 这里有人发誓 JAXB 是自切片面包以来最伟大的东西 我很好奇 Stack Overflow 用户认为 JAXB 的用例是什么 以及是什么使它成为该用例的好或坏解决方案 我是用于操作 XM
  • JavaFX 和 Spring Boot - NPE

    我仍在与我的问题作斗争 我想使用 Spring Framework 来注入依赖项 并且必须使用 Spring boot 来集成两者 不幸的是 在第一个视图中 自动装配运行正确 但如果我进入下一个阶段 我仍然只能Null Pointer Ex
  • 是否可以检查对象是否已附加到实体框架中的数据上下文?

    当尝试附加已附加到给定上下文的对象时 我收到以下错误context AttachTo ObjectStateManager 中已存在具有相同键的对象 ObjectStateManager 无法跟踪具有相同键的多个对象 有没有一种方法可以实现
  • Xcode 4.4.1:git 不再显示修改

    不幸的是 突然间 我无法确定 Xcode 停止显示 git 信息的发生时刻 也无法从 Xcode 内部通过 git 提交 恢复 等 如果我检查组织者的存储库 我的项目仍会列出其上次提交 使用命令行 git status 我可以看到所有未暂存
  • PHP 中的 Session 和 Cookie 有什么区别?

    两者有什么区别Sessions and Cookies in PHP Cookie 是浏览器存储的一点数据 并随每个请求发送到服务器 会话是存储在服务器上并与给定用户关联的数据集合 通常通过包含 ID 代码的 cookie
  • 谷歌+登录,安卓登录时反复出现唱机提示

    我是安卓开发新手 我正在将 google plus 登录集成到我的应用程序中 我有点困惑谷歌的歌唱提示重复出现 我不明白为什么 我有什么问题吗 我正在关注谷歌开发者网站做这个东西 我使用多个谷歌帐户测试了该应用程序 但其中 2 个工作正常
  • 如何从 Scrapy 获取 UTF-8 编码的 unicode 输出?

    耐心听我说 我之所以写下每一个细节 是因为工具链的很多部分都不能很好地处理 Unicode 并且不清楚是什么原因导致的 PRELUDE 我们首先设置并使用最近的 Scrapy source scrapy 1 1 2 bin activate
  • 如何使用 iPhone sdk 将 XML 字符串转换为 JSON

    我正在实现一个基于客户端的应用程序 我有一个 xml 字符串 我需要将其转换为 JSON 格式并发送到服务器 我不知道如何转换这个 你们能给我建议任何文档或想法吗 步骤 1 将 XML 读入 NSDictionary http troybr
  • 在 Eclipse 中使用 -parameters 选项进行反射

    要使用 java 反射获取方法参数名称 我们必须使用 parameters 选项编译 Java 类 但是当我使用 VM Arguments 从 eclipse 中执行此操作时 我在控制台中收到以下错误 Unrecognized option
  • OpenCSVSerde escapeChar 覆盖 quoteChar

    我有许多 csv 文件正在导入到 Hive 中 并且我发现新行的 escapeChar 会被触发 即使它位于引用字段 即我的 quoteChar 内 有什么简单的方法可以解决这个困境吗 Line1field1 text Line1field
  • 是否可以在 div 区域中包含 box-shadow 来响应单击事件?

    我有一个div充当圆形按钮 它的设计使其整体外观的很大一部分来自于box shadow div style width 50px height 50px background yellow border none div 我有一个附加到按钮
  • Common Lisp:使用与 Lisp 进程不同的工作目录启动子进程

    假设我有一个目录 A 和子目录 B 我 cd 进入 A 并启动 lisp 在该 lisp 进程中 我想启动一个 Python 子进程 其中 Python 将 B 视为其当前工作目录 lisp 进程需要在 A 中拥有 cwd 而 python
  • Youtube Data API v3:喜欢另一个用户的评论

    我想使用 Youtube Data API 来点赞评论 当我尝试喜欢自己的评论时 它可以正常工作 没有任何问题 但是当我尝试喜欢其他人的评论时 我会收到错误 reason invalidCommentMetadata message The
  • mysql_fetch_assoc():提供的参数不是有效的 MySQL 结果资源[重复]

    这个问题在这里已经有答案了 可能的重复 警告 mysql fetch array 提供的参数不是有效的 MySQL 结果 我真的坚持这一点 我收到了这个错误 mysql fetch assoc 提供的参数不是 文件名 中有效的 MySQL
  • LINQ 中的动态 where 子句?

    我正在尝试根据动态条件加载数据 string tempQry string Empty if string IsNullOrEmpty cusid string IsNullOrEmpty mktid tempQry x gt x Mark
  • Scrapy安装(需要Microsoft Visual C++ 14.0)

    我已经尝试通过命令安装 scrapy 好几天了 pip install scrapy 下载要求后 我收到此错误代码 error Microsoft Visual C 14 0 is required Get it with Microsof
  • Tridion:替换 Query.QueryOperator 方法?

    Query QueryOperator AND Field我们在 Tridion R5 3 VBscript 模板中使用了这种方法 效果很好 最近 在迁移到Tridion 2011 SP1时 我们尝试使用此方法 但它不起作用 我们了解到该方
  • 通过 pandas 坐标数据框查找单元格中的点

    我必须通过两个 pandas 数据框找到哪些点位于方形单元格网格内 给定点坐标和单元格边界坐标 我正在打电话dfc包含代码和单元格边界坐标的数据框 我简化了问题 在实际分析中我有一个大网格 其中包含地理点和大量要检查的点 Code minx
  • Openshift 缺少创建文件的权限

    spring boot应用程序部署在openshift 4上 该应用程序需要在nfs share上创建一个文件 openshift 容器已在 NFS 类型上配置卷挂载 openshift 上的容器创建一个具有随机用户 ID 的 pod 如下
  • 在 Apache Beam 中如何处理 Pipeline-IO 级别的异常/错误

    我正在使用 Spark runner 作为 Apache Beam 中的管道运行程序 并发现错误 通过得到错误 我的问题提出了 我知道错误是由于 sql 查询中的 Column name 不正确造成的 但我的问题是如何在 IO 级别处理错误