在 RDD 方法/闭包中使用 SparkContext hadoop 配置,例如 foreachPartition

2023-11-22

我正在使用 Spark 读取一堆文件,详细说明它们,然后将它们全部保存为序列文件。我想要的是每个分区有 1 个序列文件,所以我这样做了:

SparkConf sparkConf = new SparkConf().setAppName("writingHDFS")
                .setMaster("local[2]")
                .set("spark.streaming.stopGracefullyOnShutdown", "true");
        final JavaSparkContext jsc = new JavaSparkContext(sparkConf);
        jsc.hadoopConfiguration().addResource(hdfsConfPath + "hdfs-site.xml");
        jsc.hadoopConfiguration().addResource(hdfsConfPath + "core-site.xml");
        //JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(5*1000));

        JavaPairRDD<String, PortableDataStream> imageByteRDD = jsc.binaryFiles(sourcePath);
        if(!imageByteRDD.isEmpty())
            imageByteRDD.foreachPartition(new VoidFunction<Iterator<Tuple2<String,PortableDataStream>>>() {

                @Override
                public void call(Iterator<Tuple2<String, PortableDataStream>> arg0){
                        throws Exception {
                  [°°°SOME STUFF°°°]
                  SequenceFile.Writer writer = SequenceFile.createWriter(
                                     jsc.hadoopConfiguration(), 
//here lies the problem: how to pass the hadoopConfiguration I have put inside the Spark Context? 
Previously, I created a Configuration for each partition, and it works, but I'm sure there is a much more "sparky way"

有谁知道如何使用 Hadoop 配置对象insideRDD 关闭?


这里的问题是 Hadoop 配置没有标记为Serializable,所以 Spark 不会将它们拉入 RDD 中。它们被标记为Writable,因此 Hadoop 的序列化机制可以对它们进行编组和解组,但 Spark 不能直接使用它

两个长期修复选项是

  1. 添加对 Spark 中可写序列化的支持。或许SPARK-2421?
  2. 使 Hadoop 配置可序列化。
  3. 添加对序列化 Hadoop 配置的显式支持。

对于使 Hadoop 配置可序列化,您不会遇到任何重大反对意见;前提是您实现了自定义的 ser/deser 方法,该方法委托给可写 IO 调用(并且仅迭代所有键/​​值对)。我是作为 Hadoop 提交者这么说的。

Update:以下是创建可序列化类的代码,该类可编组 Hadoop 配置的内容。创建它与val ser = new ConfSerDeser(hadoopConf);在你的 RDD 中将其引用为ser.get().

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

 import org.apache.hadoop.conf.Configuration

/**
 * Class to make Hadoop configurations serializable; uses the
 * `Writeable` operations to do this.
 * Note: this only serializes the explicitly set values, not any set
 * in site/default or other XML resources.
 * @param conf
 */
class ConfigSerDeser(var conf: Configuration) extends Serializable {

  def this() {
    this(new Configuration())
  }

  def get(): Configuration = conf

  private def writeObject (out: java.io.ObjectOutputStream): Unit = {
    conf.write(out)
  }

  private def readObject (in: java.io.ObjectInputStream): Unit = {
    conf = new Configuration()
    conf.readFields(in)
  }

  private def readObjectNoData(): Unit = {
    conf = new Configuration()
  }
}

请注意,对于某些人来说,将其设为对所有 Writeable 类通用是相对简单的;您只需要在构造函数中提供一个类名,并在反序列化期间使用它来实例化可写对象。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

在 RDD 方法/闭包中使用 SparkContext hadoop 配置,例如 foreachPartition 的相关文章

  • Google 地图查询返回的 JSON 包含像 \x26 这样的编码字符(如何解码?)

    在 Java 应用程序中 我获取 JSON 来自 Google 地图 其中包含以下字符 x26我想将其转换为其原始字符 据我所知 这是一个 UTF 8 表示法 但我不完全确定 在源 JSON 中 可能会出现各种编码字符 例如 x3c div
  • MP3:一种以毫秒为单位获取任何给定字节位置的位置的方法?

    我创建了一个 servlet 它返回从客户端请求的任何给定字节位置开始的流 来自 MP3 文件 这允许客户端在任何给定字节位置立即开始播放 而无需进行任何本地查找 现在 我有一个滑块可以直观地显示进度 我正在使用当前字节位置来更新滑块 但是
  • 检查双精度值的等于和不等于条件

    我在比较两者时遇到困难double values using and 我创建了 6 个双变量并尝试进行比较If健康 状况 double a b c d e f if a b c d e f My code here in case of t
  • 如何打印整个字符串池?

    我想打印包含文字的整个字符串池String使用添加的对象intern 就在垃圾收集之前 JDK有没有隐式的方法来进行这样的操作 我们如何检查字符串池 EDIT The comment suggests that there may be a
  • 通过 InjectMocks Spy 注入对象

    我需要对一个类运行一系列单元测试 该类具有 Autowired Logger 实现 实现的基本思想是 Mock Logger logger InjectMocks TestedClass tested 但我想保存日志输出功能 Mockito
  • 方法断点可能会大大减慢调试速度

    每当向方法声明行添加断点 在 Intellij IDEA 或 Android Studio 中 时 都会出现一个弹出窗口 方法断点可能会大大减慢调试速度 为什么会这样戏剧性地减慢调试速度 是我的问题吗 将断点放在函数的第一行有什么不同 Th
  • 如何将 XMP XML 块序列化为现有的 JPEG 图像?

    我有许多 JPEG 图像 其中包含损坏的 XMP XML 块 我可以轻松修复这些块 但我不确定如何将 固定 数据写回图像文件 我目前正在使用 JAVA 但我愿意接受任何能让这项任务变得容易的事情 这是目标关于 XMP XML 的另一个问题
  • 如何在java中使jpeg无损?

    有没有人可以告诉我如何使用编写 jpeg 文件losslessjava中的压缩 我使用下面的代码读取字节来编辑字节 WritableRaster raster image getRaster DataBufferByte buffer Da
  • 从 html 页面和 javascript 调用 java webservice

    我正在尝试从 javascript 调用 java 实现的 Web 服务 使用 NetBeans IDE 我读过很多关于 jQuery 和 AJAX 的内容 但我似乎无法掌握它 假设我的 Web 服务 WSDL 位于 http localh
  • 从 Stax XMLStreamReader 读取以解组部分

    我正在使用 Stax 游标 API 从大型 xml 文件中提取数据 当前 我转到特殊标签的开头并使用 JAXB 解组该标签 这对于格式良好的 xml 文件效果很好 但不久前我有一个文档 其中数十万个标签中有一个未关闭 JAXB 使用 XML
  • 来自客户端的超时 Web 服务调用

    我正在使用 RestEasy 客户端调用网络服务 一项要求是 如果调用运行时间超过 5 秒 则中止 超时调用 我如何使用 RestEasy 客户端实现这一目标 我只看到服务器端超时 即如果在一定时间内未完成请求 Rest Easy 网络服务
  • 我想要一个 Java 阿拉伯语词干分析器

    我正在寻找阿拉伯语的 Java 词干分析器 我找到了一个名为 AraMorph 的库 但它的输出是无法控制的 并且它会形成不需要的单词 还有其他阿拉伯语词干分析器吗 这是新的阿拉伯语词干分析器 Assem 的阿拉伯语轻词干分析器 http
  • 从java中的字符串数组中删除空值

    java中如何从字符串数组中删除空值 String firstArray test1 test2 test4 我需要像这样没有 null 空 值的 firstArray String firstArray test1 test2 test4
  • 从一个文本文件中获取数据并将其移动到新的文本文件

    我有一个文件 里面有数据 在我的主要方法中 我读入文件并关闭文件 我调用另一种方法 在原始文件的同一文件夹内创建一个新文件 所以现在我有两个文件 原始文件和通过我调用的方法生成的文件 我需要另一种方法 从原始文件中获取数据并将其写入创建的新
  • Java 编码风格、局部变量与重复方法调用

    我更喜欢使用局部变量而不是多次调用同一方法 I prefer this Vehicle vehicle person getVehicle if vehicle instanceof Car Car car Car vehicle car
  • 覆盖hadoop中的log4j.properties

    如何覆盖hadoop中的默认log4j properties 如果我设置 hadoop root logger WARN console 它不会在控制台上打印日志 而我想要的是它不应该在日志文件中打印 INFO 我在 jar 中添加了一个
  • 为什么 BufferedWriter 不写入文件?

    我有这个代码 String strings Hi You He They Tetrabenzene Caaorine Calorine File file new File G words txt FileWriter fWriter Bu
  • Java 推断泛型类型

    我正在寻找类似的推断捕获泛型类型的概念 类似于以下方法片段 但不是捕获泛型类型的类 public
  • java中如何找到class文件的包

    我正在编写一个使用 class 文件的 java 程序 我希望能够读取文件系统上的 class 文件 使用 InputStream 并确定它所在的包 该 class 文件可能不在一个好的包目录结构中 它可能位于某个随机位置 我怎样才能做到这
  • 尝试使用带有有效购买令牌的 Java Google Play Developer API v3 检索应用内购买信息时出现错误请求(无效值)

    当使用 Java Google Play Developer API 版本 3 并请求有效购买令牌的购买信息时 我收到以下异常 API 调用返回 400 Bad Request 响应以及以下消息 code 400 errors domain

随机推荐

  • 从命令行将 MySQL 变量传递给脚本

    我有一个 MySQL 更新脚本 我想从命令行运行 但我希望能够将阶段域变量传递给脚本 我知道这行不通 但这是我描述我正在尝试做的事情的最佳方式 uroot hlocalhost mydatabase execute SET domain m
  • SimpleMembership、MVC4、AuthorizeAttribute 和角色

    我正在尝试向我的一些 MVC4 控制器添加授权属性 只要它是一个计划 它们就可以正常工作 Authorize or Authorize Users myuser 但是当我添加任何类型的角色过滤时 它就会崩溃 例如 Authorize Rol
  • htmlentities() 与 htmlspecialchars()

    两者有什么区别htmlspecialchars and htmlentities 我什么时候应该使用其中之一 htmlspecialchars可能用过了 当不需要对具有 HTML 等效项的所有字符进行编码时 如果您知道页面编码与文本特殊符号
  • PHP - 密码验证问题

    我已经为此摸不着头脑两个多小时了 我研究过 stackoverflow 上的文章 包括 Bcrypt 未正确验证的问题 php的password hash和password verify问题不匹配 password verify 调用返回
  • DB (SQL) 自动压力/负载工具?

    我想测量数据库应用程序的性能和可扩展性 我正在寻找一种工具 允许我对数据库运行许多 SQL 语句 将数据库和脚本 SQL 文件作为参数 必要的详细信息 例如主机名 端口 登录名 理想情况下 它应该让我控制参数 例如模拟客户端的数量 测试持续
  • 无法加载脚本。确保您正在运行 Metro 服务器(运行“react-native start”)或者您的捆绑包“index.android.bundle”

    在 android studio 中运行我的项目时出现该错误 我刚刚安装了所有内容 并且一步一步地遵循了所有内容 我正在使用genymotion作为模拟器 并使用react native start从根目录 检查端口 8081 禁用即时运行
  • 在同一命名容器中重用 Facelets 组合时避免重复 id

    我有一个
  • c中的scanf和换行符[重复]

    这个问题在这里已经有答案了 我今天刚刚在 C 班进行了一次测试 我有理由相信答案可能是错误的 scanf d n x Evaluate the expression for the string 54321 n 这个想法非常简单 找到一个整
  • 如何制作固定div?

    我试图使框固定在页面的右下边框中 并且不会随着页面向下滚动而移动 但它对我不起作用 不知道为什么 这是我的代码 div class tooltip div class tooltip top 1 div div class tooltip
  • 将用 Python 训练的 XGBoost 模型移植到用 C/C++ 编写的另一个系统

    假设我已经用 python 成功训练了 XGBoost 机器学习模型 x train x test y train y test train test split x y test size 0 2 random state 7 model
  • 将视频设置为背景“图像”的方法

    我需要建立一个以视频为背景的网站 我可以使用吗z index将其他元素放置在视频顶部 有更好的选择吗 我没有测试过 但你可以尝试设置宽度 高度
  • 应用内购买以解锁付费功能

    我希望发布我的应用程序的两个版本 免费和付费 付费应用程序将具有更多功能 但没有额外的内容 最初我打算在市场上发布两个独立的应用程序 但事实证明很难保留单个代码库并拥有两个独立的应用程序 应用内购买是更好的方法吗 因此 我发布了一个免费应用
  • Java:枚举常量内方法和变量的定义

    我在做一些实验时不小心写了一段代码 这很奇怪 我不明白 我什至很惊讶我能编译它 它看起来像这样 enum Foo VALUE 1 public int myVariable 1 VALUE 2 public void myMethod VA
  • C++ 模块和动态链接的预期关系是什么?

    C 模块 TS 提供了一个出色的工具 可以消除预处理器 缩短编译时间 并且通常支持更健壮 模块化的 C 代码开发 至少对于非模板代码而言 底层机制提供对普通程序中符号导入和导出的控制 然而 有一个major开发用于两种动态加载的库时遇到的问
  • Mongoose 高级自定义架构对象类型

    我找不到任何例子advanced 自定义架构类型涉及自定义对象 或价值对象 在猫鼬 gt 4 4 中 想象一下我想使用自定义类型 例如 function Polygon c this bounds some data this npoint
  • 高效的 Javascript 字符串替换

    嘿 我有一个 HTML 块 我将重复使用它 在用户访问期间的不同时间 而不是一次 我认为实现此目的的最佳方法是创建一个 HTML div 隐藏它 并在需要时获取其innerHTML并对几个关键字执行replace 作为 HTML 块的示例
  • 如何允许将 NSAttributedString 文本输入到 UITextView 中?

    我试图允许在 UITextView 中输入不同样式的文本 有点像使用粗体或斜体等简单属性的文本编辑器 我通过使用 textView 来理解attributedText属性 我可以将属性应用于特定范围的文本 这很好 但我希望能够在 textV
  • 如何在SWT中显示文件的系统图标?

    我想显示类似于的文件树java2s com 创建惰性文件树 但包括实际的系统图标 特别是文件夹 SWT 似乎没有提供这个 程序 API 不支持文件夹 所以我想出了以下内容 public Image getImage File file Im
  • tcmalloc:静态编译时如何覆盖我的 malloc 调用?

    当我使用LD PRELOAD usr local lib libtcmalloc so 我对 malloc 的所有调用都变成了 tcmalloc 调用 然而 当我静态链接 libtcmalloc 时 我发现直接 malloc 被调用 除非我
  • 在 RDD 方法/闭包中使用 SparkContext hadoop 配置,例如 foreachPartition

    我正在使用 Spark 读取一堆文件 详细说明它们 然后将它们全部保存为序列文件 我想要的是每个分区有 1 个序列文件 所以我这样做了 SparkConf sparkConf new SparkConf setAppName writing