(Hadoop) MapReduce - 链作业 - JobControl 不会停止

2023-11-23

我需要链接两个 MapReduce 作业。我使用 JobControl 将 job2 设置为依赖于 job1。 它有效,输出文件已创建!但它并没有停止! 在 shell 中它保持这种状态:

12/09/11 19:06:24 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
12/09/11 19:06:25 INFO input.FileInputFormat: Total input paths to process : 1
12/09/11 19:06:25 INFO util.NativeCodeLoader: Loaded the native-hadoop library
12/09/11 19:06:25 WARN snappy.LoadSnappy: Snappy native library not loaded
12/09/11 19:07:00 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
12/09/11 19:07:00 INFO input.FileInputFormat: Total input paths to process : 1

我怎样才能阻止它? 这是我的主要。

public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Configuration conf2 = new Configuration();

    Job job1 = new Job(conf, "canzoni");
    job1.setJarByClass(CanzoniOrdinate.class);
    job1.setMapperClass(CanzoniMapper.class);
    job1.setReducerClass(CanzoniReducer.class);
    job1.setOutputKeyClass(Text.class);
    job1.setOutputValueClass(IntWritable.class);

    ControlledJob cJob1 = new ControlledJob(conf);
    cJob1.setJob(job1);
    FileInputFormat.addInputPath(job1, new Path(args[0]));
    FileOutputFormat.setOutputPath(job1, new Path("/user/hduser/tmp"));


    Job job2 = new Job(conf2, "songsort");
    job2.setJarByClass(CanzoniOrdinate.class);
    job2.setMapperClass(CanzoniSorterMapper.class);
    job2.setSortComparatorClass(ReverseOrder.class);
    job2.setInputFormatClass(KeyValueTextInputFormat.class);
    job2.setReducerClass(CanzoniSorterReducer.class);
    job2.setMapOutputKeyClass(IntWritable.class);
    job2.setMapOutputValueClass(Text.class);
    job2.setOutputKeyClass(Text.class);
    job2.setOutputValueClass(IntWritable.class);

    ControlledJob cJob2 = new ControlledJob(conf2);
    cJob2.setJob(job2);
    FileInputFormat.addInputPath(job2, new Path("/user/hduser/tmp/part*"));
    FileOutputFormat.setOutputPath(job2, new Path(args[1]));

    JobControl jobctrl = new JobControl("jobctrl");
    jobctrl.addJob(cJob1);
    jobctrl.addJob(cJob2);
    cJob2.addDependingJob(cJob1);
    jobctrl.run();


    ////////////////
    // NEW CODE ///   
    //////////////


    // delete jobctrl.run();
    Thread t = new Thread(jobctrl);
    t.start();
    String oldStatusJ1 = null;
    String oldStatusJ2 = null;
    while (!jobctrl.allFinished()) {
      String status =cJob1.toString();
      String status2 =cJob2.toString();
      if (!status.equals(oldStatusJ1)) {
        System.out.println(status);
        oldStatusJ1 = status;
      }
      if (!status2.equals(oldStatusJ2)) {
        System.out.println(status2);
        oldStatusJ2 = status2;
      }     
     }
    System.exit(0);

} }


我基本上做了彼得罗上面提到的事情。

public class JobRunner implements Runnable {
  private JobControl control;

  public JobRunner(JobControl _control) {
    this.control = _control;
  }

  public void run() {
    this.control.run();
  }
}

在我的地图/归约类中,我有:

public void handleRun(JobControl control) throws InterruptedException {
    JobRunner runner = new JobRunner(control);
    Thread t = new Thread(runner);
    t.start();

    while (!control.allFinished()) {
        System.out.println("Still running...");
        Thread.sleep(5000);
    }
}

我只是传递 jobControl 对象。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

(Hadoop) MapReduce - 链作业 - JobControl 不会停止 的相关文章

  • 如何通过sparkSession向worker提交多个jar?

    我使用的是火花2 2 0 下面是我在 Spark 上使用的 java 代码片段 SparkSession spark SparkSession builder appName MySQL Connection master spark ip
  • 如何通过Python访问Hive?

    https cwiki apache org confluence display Hive HiveClient HiveClient Python https cwiki apache org confluence display Hi
  • 纱线上的火花,连接到资源管理器 /0.0.0.0:8032

    我正在我的开发机器 Mac 上编写 Spark 程序 hadoop的版本是2 6 spark的版本是1 6 2 hadoop集群有3个节点 当然都在linux机器上 我在idea IDE中以spark独立模式运行spark程序 它运行成功
  • 将数据从 oracle 移动到 HDFS,处理并从 HDFS 移动到 Teradata

    我的要求是 将数据从 Oracle 移至 HDFS 处理HDFS上的数据 将处理后的数据移至 Teradata 还需要每 15 分钟执行一次整个处理 源数据量可能接近50GB 处理后的数据也可能相同 在网上搜索了很多之后 我发现 PRARO
  • 适用于 Hadoop 的 DynamoDB 输入格式

    我必须使用 Hadoop mapreduce 处理保留在 Amazon Dynamodb 中的一些数据 我在互联网上搜索 Dynamo DB 的 Hadoop InputFormat 但找不到它 我对 Dynamo DB 不熟悉 所以我猜测
  • 猪的组连接等效吗?

    试图在 Pig 上完成这个任务 寻找 MySQL 的 group concat 等效项 例如 在我的表中 我有以下内容 3fields userid clickcount pagenumber 155 2 12 155 3 133 155
  • 如何从hdfs读取文件[重复]

    这个问题在这里已经有答案了 我在 project1目录下的hadoop文件系统中有一个文本文件名mr txt 我需要编写 python 代码来读取文本文件的第一行 而不将 mr txt 文件下载到本地 但我无法从 hdfs 打开 mr tx
  • Hive:如何分解嵌入 CSV 文件中的 JSON 列?

    从 CSV 文件 带有标题和管道分隔符 中 我得到了以下两个内容 其中包含一个 JSON 列 内部有一个集合 如下所示 第一种情况 使用没有名称的 JSON 集合 ProductId IngestTime ProductOrders 918
  • 如何跟踪hadoop中哪个数据块在哪个数据节点?

    如果复制一个数据块 会复制到哪个数据节点 是否有任何工具可以显示复制块存在的位置 如果您知道文件名 则可以通过 DFS 浏览器查找 转到您的 namenode Web 界面 说 浏览文件系统 并导航到您感兴趣的文件 在页面底部 将列出文件中
  • 将上下文管理器的动态可迭代链接到单个 with 语句

    我有一堆想要链接的上下文管理器 第一眼看上去 contextlib nested看起来是一个合适的解决方案 但是 此方法在文档中被标记为已弃用 该文档还指出最新的with声明直接允许这样做 自 2 7 版起已弃用 with 语句现在支持此
  • 名称节点处于安全模式

    我提到了这些问题名称节点处于安全模式 无法离开 https stackoverflow com questions 15803266 name node is in safe mode not able to leave and SafeM
  • Spark 写入 hdfs 无法使用 saveAsNewAPIHadoopFile 方法

    我在 CDH 5 2 0 上使用 Spark 1 1 0 并试图确保我可以读取和写入 hdfs 我很快意识到 textFile 和 saveAsTextFile 调用旧的 api 并且似乎与我们的 hdfs 版本不兼容 def testHD
  • Hadoop NoSuchMethodError apache.commons.cli

    我在用着hadoop 2 7 2我用 IntelliJ 做了一个 MapReduce 工作 在我的工作中 我正在使用apache commons cli 1 3 1我把库放在罐子里 当我在 Hadoop 集群上使用 MapReduceJob
  • 如何找到 JAR:/home/hadoop/contrib/streaming/hadoop-streaming.jar

    我正在练习有关 Amazon EMR 的复数视角视频教程 我被困住了 因为我收到此错误而无法继续 Not a valid JAR home hadoop contrib streaming hadoop streaming jar 请注意
  • Spark on Hive SQL 查询错误 NoSuchFieldError: HIVE_STATS_JDBC_TIMEOUT

    针对 Hive 2 1 0 提交 Spark 1 6 0 SQL 应用程序时出现错误 Exception in thread main java lang NoSuchFieldError HIVE STATS JDBC TIMEOUT a
  • HBase、Hadoop:如何估计 HBase 表或 Hadoop 文件系统路径的大小?

    我有多个 HBase 表 如何估计在 java 中使用的表的大致大小 一种方法是你必须使用java客户端访问hdfs 通常在 hbase文件夹 所有表格信息 将在场 Hadoop 外壳 你可以检查使用hadoop fs du h path
  • 当我将文件存储在 HDFS 中时,它们会被复制吗?

    我是 Hadoop 新手 当我使用以下方式存储 Excel 文件时hadoop fs putcommoad 它存储在HDFS中 复制因子为3 我的问题是 是否需要3份并分别存储到3个节点中 这是 HDFS 工作的漫画 https docs
  • 无法在 Windows 10 中启动 Spark Master

    我是 Spark 新手 我正在尝试手动启动 master 在 Windows 10 中使用 MINGW64 当我这样做时 Downloads spark 1 5 1 bin hadoop2 4 spark 1 5 1 bin hadoop2
  • Apache Spark 何时发生混洗?

    我正在优化 Spark 中的参数 并且想确切地了解 Spark 是如何对数据进行洗牌的 准确地说 我有一个简单的字数统计程序 并且想知道spark shuffle file buffer kb如何影响运行时间 现在 当我将此参数设置得非常高
  • MongoDB 存储过程等效项

    我有一个包含商店列表的大型 CSV 文件 其中一个字段是邮政编码 我有一个名为 ZipCodes 的独立 MongoDB 数据库 它存储任何给定邮政编码的纬度和经度 在 SQL Server 中 我将执行一个名为 InsertStore 的

随机推荐

  • 如何使用 FParsec 解析注释

    我正在尝试使用 FParsec 从 s 表达式语言解析 lisp 风格的注释 我在上一个线程中解析单行注释时得到了一些帮助 如何转换 FParsec 解析器来解析空格 虽然这个问题已经解决 但我仍然需要解析多行注释 这是当前的代码 Read
  • 使用 GUID 的一部分作为 ID

    我正在开发 ASP Net MVC 应用程序 我的行动之一需要id作为参数 例如 public actionresult Detail Guid id return View 正如你所看到的 我正在使用Guid代替Int 这个问题更具装饰性
  • 是否可以知道哪些 SciPy / NumPy 函数在多个内核上运行?

    我试图明确找出 SciPy NumPy 中的哪些函数在多个处理器上运行 我可以例如在 SciPy 参考手册中读到 SciPy 使用了这个 但我更感兴趣的是到底哪些函数确实运行并行计算 因为并非所有函数都这样做 理想的情况当然是当您键入 he
  • GHCi 中的功能非详尽模式

    我想创建一个显示列表最后一个元素的函数 这是我的代码 ghci gt let myLast a gt a ghci gt let myLast error ghci gt let myLast x x ghci gt let myLast
  • “重新打开上次关闭的选项卡”导致显示上次 ajax 请求内容

    我正在使用 HTML 5 历史 api 在 ajax 请求发生时保存状态 并且如果用户请求没有 ajax 请求的同一页面 我会提供完整的 html 内容 浏览器的 重新打开最后关闭的选项卡 功能会带来最后的 ajax 请求内容 而无需访问服
  • 为什么错误处理在 Nodemailer 中不起作用?

    我正在尝试使用 nodemailer 设置一个非常简单的联系表单 它工作正常 但我的问题是它不处理错误 如果引发错误 页面应该重定向 但重定向不会发生并且应用程序停止运行 我一生都无法弄清楚为什么会发生这种情况 这是我的代码 if req
  • d3.js:有限制的平移

    我正在研究具有平移功能的基本线性图表 我设法通过限制图表元素的拖动范围d3 event translate values var tx Math max 0 d3 event translate 0 ty Math min 0 d3 eve
  • 创建 Hermetic Maven 构建

    我正在尝试创建一种可以实现密封构建的方法 同时仍然依赖于项目中的 SNAPSHOT 依赖项 出于示例的目的 假设我有一个项目 其依赖结构如下 other 1 2 SNAPSHOT mine 1 2 3 thing 3 1 SNAPSHOT
  • 在 PyQT5 中创建自定义小部件

    我想知道如何在 pyqt 中创建自定义小部件 我见过许多不同的 C 示例 以及一些 pyqt 的非描述性示例 但没有任何内容真正解释如何执行和实现它 特别是没有任何示例基本上不仅仅是修改后的 qt designer 输出 而且我正在从头开始
  • 如何从 HttpClient 响应访问标头? (角/离子)

    我使用的登录端点返回不记名令牌作为响应标头 正如我在 网络 Chrome 检查窗口中看到的那样 Response Headers Access Control Allow Credentials true Access Control Al
  • 向 TIdHttp 请求添加自定义标头,标头值包含逗号

    我正在使用 Delphi XE2 和 Indy 10 5 8 0 我有一个 TIdHttp 实例 我需要向请求添加自定义标头 标头值中包含逗号 因此它会自动解析为多个标头 我不希望它这样做 我需要自定义标头的标头值仍然是一个字符串 而不是根
  • 在基于Web的Spring范围中使用Thymeleaf处理HTML文件并将处理后的模板存储为字符串

    我正在尝试使用 thymeleaf 渲染 HTML 文件 并将生成的 HTML 内容保存在 String 变量中web based scopes of Spring这样我以后就可以用它来发送电子邮件或将内容转换为 pdf 我已经完成了中给出
  • 查找字符串中最短的重复模式

    我想知道是否有办法在 Octave Matlab 中进行模式匹配 我知道 Maple 10 有执行此操作的命令 但不确定我需要在 Octave Matlab 中做什么 所以如果一个数字是12341234123412341234模式匹配将是1
  • 为什么 &[T] 参数也接受 &Vec

    我正在阅读 Rust 书 即迷你grep项目 在那里我遇到了以下片段 fn main let args Vec
  • 计算文件中单词数的最简单方法

    我正在尝试以最简单的方式编写一个程序来计算 Scala 语言文件中单词出现的次数 到目前为止我有这些代码 import scala io Codec string2codec import scala io Source import sc
  • 在遍历表达式时提取实例变量的当前值

    我目前正在尝试编写一些将 C 表达式转换为文本的代码 为此 我不仅需要遍历表达式树 还需要评估其中的一小部分 以获得局部变量的当前值 我发现很难用语言来表达 所以这里是伪代码 缺少的部分在第一种方法中 public class Progra
  • 如何将时间戳转换为可读的日期/时间?

    我有一个 APIresult像这样给出时间戳1447804800000 如何使用 Javascript jQuery 将其转换为可读格式 您可以使用以下命令将其转换为可读日期new Date method 如果有特定的日期戳 可以通过以下方
  • 我应该如何理解 dis.dis 的输出?

    我想了解如何使用dis Python字节码的反汇编器 具体来说 应该如何解释输出dis dis or dis disassemble 这是一个非常具体的示例 在 Python 2 7 3 中 dis dis heapq nsmallest
  • 克隆整个对象图

    使用此代码序列化对象时 public object Clone var serializer new DataContractSerializer GetType using var ms new System IO MemoryStrea
  • (Hadoop) MapReduce - 链作业 - JobControl 不会停止

    我需要链接两个 MapReduce 作业 我使用 JobControl 将 job2 设置为依赖于 job1 它有效 输出文件已创建 但它并没有停止 在 shell 中它保持这种状态 12 09 11 19 06 24 WARN mapre