如何配置粘合书签以与 scala 代码一起使用?

2023-12-27

考虑 Scala 代码:

import com.amazonaws.services.glue.GlueContext
import com.amazonaws.services.glue.util.{GlueArgParser, Job, JsonOptions}
import org.apache.spark.SparkContext

import scala.collection.JavaConverters.mapAsJavaMapConverter

object MyGlueJob {

  def main(sysArgs: Array[String]) {
    val spark: SparkContext = SparkContext.getOrCreate()
    val glueContext: GlueContext = new GlueContext(spark)

    val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray)
    Job.init(args("JOB_NAME"), glueContext, args.asJava)

    val input = glueContext
      .getCatalogSource(database = "my_data_base", tableName = "my_json_gz_partition_table")
      .getDynamicFrame()

    val processed = input.applyMapping(
      Seq(
        ("id",                                        "string", "id", "string"),
        ("my_date",                                   "string", "my_date", "string")
      ))
    glueContext.getSinkWithFormat(
      connectionType = "s3",
      options = JsonOptions(Map("path" -> "s3://my_path", "partitionKeys" -> List("my_date"))),
      format = "orc", transformationContext = ""
    ).writeDynamicFrame(processed)
    Job.commit
  }
}

输入是使用 gzip 压缩的分区 json 文件,该文件按日期列分区。一切works- 数据以json格式读取,以orc格式写入。

但是,当尝试使用相同的数据运行作业时,它会再次读取并写入重复的数据。该作业中启用了书签。方法Job.init and Job.commit被调用。怎么了?

UPDATED

我添加了一个transformationContext参数为getCatalogSource and getSinkWithFormat:

        val input = glueContext
      .getCatalogSource(database = "my_data_base", tableName = "my_json_gz_partition_table", transformationContext = "transformationContext1")
      .getDynamicFrame()

and:

    glueContext.getSinkWithFormat(
      connectionType = "s3",
      options = JsonOptions(Map("path" -> "s3://my_path", "partitionKeys" -> List("my_date"))),
      format = "orc", transformationContext = "transformationContext2"
    ).writeDynamicFrame(processed)

现在魔法就这样“起作用”了:

  1. 第一次运行 - 好的
  2. 第二次运行(使用相同的数据或相同的数据和新的数据) - 它失败并出现错误(稍后)

第二次(及后续)运行后再次发生错误。 还有留言Skipping Partition {"my_date": "2017-10-10"}出现在日志中。

ERROR ApplicationMaster: User class threw exception: org.apache.spark.sql.AnalysisException: Partition column my_date not found in schema StructType(); org.apache.spark.sql.AnalysisException: Partition column my_date not found in schema StructType();
at org.apache.spark.sql.execution.datasources.PartitioningUtils$$anonfun$partitionColumnsSchema$1$$anonfun$apply$11.apply(PartitioningUtils.scala:439)
at org.apache.spark.sql.execution.datasources.PartitioningUtils$$anonfun$partitionColumnsSchema$1$$anonfun$apply$11.apply(PartitioningUtils.scala:439)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.execution.datasources.PartitioningUtils$$anonfun$partitionColumnsSchema$1.apply(PartitioningUtils.scala:438)
at org.apache.spark.sql.execution.datasources.PartitioningUtils$$anonfun$partitionColumnsSchema$1.apply(PartitioningUtils.scala:437)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:285)
at org.apache.spark.sql.execution.datasources.PartitioningUtils$.partitionColumnsSchema(PartitioningUtils.scala:437)
at org.apache.spark.sql.execution.datasources.PartitioningUtils$.validatePartitionColumn(PartitioningUtils.scala:420)
at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:443)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:215)
at com.amazonaws.services.glue.SparkSQLDataSink.writeDynamicFrame(DataSink.scala:123)
at MobileArcToRaw$.main(script_2018-01-18-08-14-38.scala:99)

胶水书签究竟是怎么回事???哦


您是否尝试过设置transformationContext源和接收器的值是否相同?它们当前在上次更新中设置为不同的值。

transformationContext = "transformationContext1"

and

transformationContext = "transformationContext2"

我在使用胶水和书签时也遇到过这个问题。我正在尝试执行类似的任务,其中读取按年、月和日分区的分区 JSON 文件,每天都会有新文件到达。我的工作运行一个转换来提取数据的子集,然后放入 S3 上的分区 Parquet 文件中。

我使用的是 Python,因此 DynamicFrame 的初始实例化如下所示:

dyf = glue_context.create_dynamic_frame.from_catalog(database="dev-db", table_name="raw", transformation_ctx="raw")

最后像这样接收到 S3:

glue_context.write_dynamic_frame.from_options( frame=select_out, connection_type='s3', connection_options={'path': output_dir, 'partitionKeys': ['year', 'month', 'day']}, format='parquet', transformation_ctx="dev-transactions" )

最初,我运行了该作业,并在启用书签的情况下正确生成了 Parquet。然后我添加了新一天的数据,更新了输入表上的分区并重新运行。第二个作业将失败并出现如下错误:

pyspark.sql.utils.AnalysisException: u"cannot resolve 'year' given input columns: [];;\n'Project ['year, 'month, 'day, 'data']

改变transformation_ctx是相同的(dev-transactions在我的例子中)使该过程能够正常工作,仅处理增量分区并为新分区生成 Parquet。

关于一般书签以及如何使用转换上下文变量的文档非常稀疏。

Python 文档只是说:(https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-glue-context.html https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-glue-context.html):

conversion_ctx – 要使用的转换上下文(可选)。

Scala 文档说(https://docs.aws.amazon.com/glue/latest/dg/glue-etl-scala-apis-glue-gluecontext.html https://docs.aws.amazon.com/glue/latest/dg/glue-etl-scala-apis-glue-gluecontext.html):

conversionContext — 与作业书签使用的接收器关联的转换上下文。默认设置为空。

由于文档解释得不好,我所能观察到的最好情况是,转换上下文用于在已处理的源数据和接收器数据之间形成链接,并且定义不同的上下文会阻止书签按预期工作。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何配置粘合书签以与 scala 代码一起使用? 的相关文章

随机推荐

  • 显示:内联表

    IE 7 不支持display inline table 其他浏览器支持 对于替代解决方案我应该做什么 火狐和 IE8 支持display inline table IE6支持display inline table class inlin
  • 从 data.frame 到 ggplot2 图例的表达式

    我想向图例条目添加一个表达式 而不直接输入图例 因为我正在循环变量 本质上我想要这样 d lt data frame x 1 10 y 1 10 f rep c 0 74 gt 75 each 5 qplot x y data d colo
  • LoopBack:如何在代码中动态创建自定义 REST 端点(即时)

    我们使用 LoopBack REST 框架来公开我们的数据库 和业务逻辑 我们需要允许客户在数据库 单租户和多租户 中创建自定义表 这些表可以通过 REST 端点进行访问 所有客户都需要使用相同的通用 生产 REST 端点 这些端点将公开在
  • Flutter 是否支持 FieldPath?

    我找不到FieldPath in the cloud firestore颤振插件 https pub dartlang org packages cloud firestore但是 我认为这是一个非常常见的工具 将是此类插件的第一个实现之一
  • NativeScript WebView在默认浏览器中打开url

    我正在尝试构建应用程序WebView以及 WebView 内 URL 上的单击 点击事件 下面的解决方案打开外部浏览器和 URL 但它也在 webview 中加载相同的 url 内容 有没有办法阻止在 webview 中加载新的 url 这
  • 运行“npm install”时保留符号链接

    如果我们这样做 npm link x 然后我们跑npm install 它将覆盖符号链接包 有没有办法跑npm install不覆盖符号链接包 就像是 npm install preserve symlinks or npm install
  • 如何防止单击锚元素内的图像时的链接行为?

    我有一个与此类似的代码 a href link html goto link page img src images edit gif alt a 现在 如果您单击文本 我希望 href 链接能够正常工作 但是如果您单击图像 它应该执行其他
  • 在 Safari 中跳转输入字段

    我正在尝试重新创建一个非常酷的占位符用户界面 http dribbble com shots 1254439 GIF Mobile Form Interaction list users只使用 HTML 和 CSS 我就差不多明白了 dem
  • Vulkan 的 VkMemoryHeapFlagBits 是否缺少值?

    在 Vulkan 规范 1 0 9 第 180 页 中 我们有以下内容 typedef struct VkMemoryHeap VkDeviceSize size VkMemoryHeapFlags flags VkMemoryHeap 和
  • C# - 值类型的引用包装器

    我想用c Pointtype 作为引用类型 它是一个结构 我想到了上课CPoint 其中将包含一个Point成员 有什么办法可以提高会员人数吗 Point担任成员Cpoint 我正在努力避免 cpoint point X cpoint po
  • SQL Server 从表中读取 csv 二进制文件

    我目前将 csv 格式的文件存储在磁盘上 然后像这样查询它们 SELECT FROM OPENROWSET BULK C myfile csv FORMATFILE C format fmt FIRSTROW 2 AS rs 其中 form
  • javascript 中计数器变量的奇怪值[重复]

    这个问题在这里已经有答案了 可能的重复 Javascript 臭名昭著的循环问题 https stackoverflow com questions 1451009 javascript infamous loop problem 由于某种
  • Django 1.9:django.core.exceptions.AppRegistryNotReady:应用程序尚未加载

    我正在尝试使用这个应用程序https github com benliles django chance https github com benliles django chance在我的应用程序中 我的 Django 版本是 1 9 我
  • 底部对齐 R 闪亮按钮

    我无法找到底部对齐的方法downloadButton with a selectizeInput i e library shiny runApp list ui shinyUI fluidPage fluidRow align botto
  • 如何在Android上像instagram一样实现视频过滤器[关闭]

    Closed 这个问题正在寻求书籍 工具 软件库等的推荐 不满足堆栈溢出指南 help closed questions 目前不接受答案 我需要在我的 Android 应用程序中添加视频过滤器 例如Instagram 我搜索了很多 但没有找
  • 打印 HasMorePages 不起作用 c#

    好吧 所以我一直在四处寻找 在 SO 和 Google 上 看看一个问题是否可以解决我的错误 但显然不能 所以这里是 我正在尝试打印有时会超过一页的内容 并且我正在检查要打印的页数 检查完所有这些逻辑后 我使用HasMorePages属性来
  • 使用 JSlider 实时更新 jFreeChart 的透明度

    我想问这个问答问题的后续问题 JFreeChart 可见后如何更新其外观 https stackoverflow com questions 5522575 how can i update a jfreecharts appearance
  • 为什么并行 for_each 需要前向迭代器?

    我正在设计一个遍历多个容器的迭代器 因此有一个代理对象作为返回类型 因此 它最多只能成为一个输入迭代器 这是因为前向迭代器需要reference是一个实际的引用类型 但据我所知 这对于输入迭代器来说并非如此 让我说 简单for each与我
  • 点符号解除分配?

    property copy NSString name property copy NSString orbit property copy NSNumber mass property float surfaceTemp property
  • 如何配置粘合书签以与 scala 代码一起使用?

    考虑 Scala 代码 import com amazonaws services glue GlueContext import com amazonaws services glue util GlueArgParser Job Jso