java.lang.IllegalArgumentException:错误的 FS:,预期:hdfs://localhost:9000

2024-04-24

我正在尝试实现reduce side join,并使用mapfile reader来查找分布式缓存,但在stderr中检查时它没有查找值,它显示以下错误,lookupfile文件已经存在于hdfs中,并且似乎已正确加载进入缓存,如标准输出中所示。

java.lang.IllegalArgumentException:错误的 FS: 文件:/app/hadoop/tmp/mapred/local/taskTracker/distcache/-8118663285704962921_-1196516983_170706299/localhost/input/delivery_status/DeliveryStatusCodes/data, 预期:hdfs://localhost:9000 org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:390) 在 org.apache.hadoop.hdfs.DistributedFileSystem.getPathName(DistributedFileSystem.java:140) 在 org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:554) 在 org.apache.hadoop.fs.FileSystem.getLength(FileSystem.java:816) 处 org.apache.hadoop.io.SequenceFile$Reader。(SequenceFile.java:1479) org.apache.hadoop.io.SequenceFile$Reader。(SequenceFile.java:1474) org.apache.hadoop.io.MapFile$Reader.createDataFileReader(MapFile.java:302) 在 org.apache.hadoop.io.MapFile$Reader.open(MapFile.java:284) 处 org.apache.hadoop.io.MapFile$Reader.(MapFile.java:273) 在 org.apache.hadoop.io.MapFile$Reader.(MapFile.java:260) 在 org.apache.hadoop.io.MapFile$Reader.(MapFile.java:253) 在 mr_poc.reducerrsj.initializeDepartmentsMap(reducerrsj.java:59) 在 mr_poc.reducerrsj.setup(reducerrsj.java:42) 在 org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:174) 在 org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:649) 在 org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:418) 处 org.apache.hadoop.mapred.Child$4.run(Child.java:255) 在 java.security.AccessController.doPrivileged(本机方法)位于 javax.security.auth.Subject.doAs(Subject.java:416) 在 org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1190) 在 org.apache.hadoop.mapred.Child.main(Child.java:249) java.lang.NullPointerException 位于 mr_poc.reducerrsj.buildOutputValue(reducerrsj.java:83) 在 mr_poc.reducerrsj.reduce(reducerrsj.java:127) 在 mr_poc.reducerrsj.reduce(reducerrsj.java:1) 在 org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:177) 在 org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:649) 在 org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:418) 处 org.apache.hadoop.mapred.Child$4.run(Child.java:255) 在 java.security.AccessController.doPrivileged(本机方法)位于 javax.security.auth.Subject.doAs(Subject.java:416) 在 org.apache.hadoop.security。

这是我的驱动程序代码,

package mr_poc;

import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class driverrsj extends Configured implements Tool{

    @Override
    public int run(String[] arg) throws Exception {
if(arg.length != 3)
{
    System.out.printf("3 parameters are required for DriverRSJ- <Input Dir1> <Input Dir2> <Output Dir> \n");
    return -1;
}
Job job = new Job(getConf());
Configuration conf = job.getConfiguration();
DistributedCache.addCacheFile(new URI("/input/delivery_status"), conf);
System.out.println("Cache : " + job.getConfiguration().get("mapred.cache.files"));
job.setJarByClass(driverrsj.class);
conf.setInt("cust_info", 1);
conf.setInt("status", 2);
StringBuilder inputPaths = new StringBuilder();
inputPaths.append(arg[0].toString()).append(",").append(arg[1].toString());
FileInputFormat.setInputPaths(job, inputPaths.toString());
FileOutputFormat.setOutputPath(job, new Path(arg[2]));
job.setJarByClass(driverrsj.class);
job.setMapperClass(mappperRSJ.class);
job.setReducerClass(reducerrsj.class);
job.setMapOutputKeyClass(CompositeKeyWritableRSJ.class);
job.setMapOutputValueClass(Text.class);
//job.setPartitionerClass(partinonrsj.class);
job.setSortComparatorClass(secondarysortcomp.class);
job.setGroupingComparatorClass(GroupingComparatorRSJ.class);
job.setNumReduceTasks(1);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);


boolean success =job.waitForCompletion(true);
return success? 0 : 1;

    }
    
    public static void main(String[] args) throws Exception{
        int exitCode = ToolRunner.run(new Configuration(), new driverrsj(),args);
        System.exit(exitCode);
        
    }
    

}

这是我的减速器代码

包 mr_poc;

import java.io.File;
import java.io.IOException;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.MapFile;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class reducerrsj extends Reducer<CompositeKeyWritableRSJ, Text, NullWritable, Text>{
    StringBuilder reduceValueBuilder = new StringBuilder("");
    NullWritable nullWritableKey = NullWritable.get();
    Text reduceOutputValue = new Text("");
    String strSeparator = ",";
    private MapFile.Reader deptMapReader = null;
    Text txtMapFileLookupKey = new Text();
    Text txtMapFileLookupValue = new Text();
    //Path[] cacheFilesLocal;
    //Path[] eachPath;
    
    @Override
    protected void setup(Context context) throws IOException,InterruptedException {
        Path[] cacheFiles = DistributedCache.getLocalCacheFiles(context.getConfiguration());
        
        
        
        for ( Path eachPath : cacheFiles){
            
            System.out.println(eachPath.toString());
              System.out.println(eachPath.getName());
            if(eachPath.getName().toString().contains("delivery_status"))
            {
                  
                URI uriUncompressedFile = new File(eachPath.toString()+ "/DeliveryStatusCodes").toURI();
                initializeDepartmentsMap(uriUncompressedFile, context);
            
            }
            }
        }
    
    //@SuppressWarnings("deprecation")
    private void initializeDepartmentsMap(URI uriUncompressedFile, Context context)
    throws IOException {
    // {{
    // Initialize the reader of the map file (side data)
        Configuration conf = context.getConfiguration();
        conf.addResource(new Path("/usr/local/hadoop-1.2.1/conf/core-site.xml"));
        FileSystem dfs = FileSystem.get(conf);
    try {
        
        
    deptMapReader = new MapFile.Reader(dfs,uriUncompressedFile.toString(), context.getConfiguration());
    } catch (Exception e) {
    e.printStackTrace();
    }
    // }}
    }
    private StringBuilder buildOutputValue(CompositeKeyWritableRSJ key,
            StringBuilder reduceValueBuilder, Text value) {
             
            if (key.getsourceindex() == 2) {
            
             
            String arrSalAttributes[] = value.toString().split(",");
            txtMapFileLookupKey.set(arrSalAttributes[0].toString());
            System.out.println("key=" + txtMapFileLookupKey);
            
            
            try {
                
            deptMapReader.get(txtMapFileLookupKey, txtMapFileLookupValue);
            }
             catch (Exception e) {
            txtMapFileLookupValue.set("");
                e.printStackTrace();
            } finally {
            txtMapFileLookupValue
            .set((txtMapFileLookupValue.equals(null) || txtMapFileLookupValue
            .equals("")) ? "NOT-FOUND"
            : txtMapFileLookupValue.toString());
            }
        
            reduceValueBuilder.append(txtMapFileLookupValue.toString());
            
             
            } else if(key.getsourceindex() == 1) {
        
            String arrEmpAttributes[] = value.toString().split(",");
            reduceValueBuilder.append(arrEmpAttributes[0].toString()).append(
            strSeparator);
            } 
            
             
            
            txtMapFileLookupKey.set("");
            txtMapFileLookupValue.set("");
            
            return reduceValueBuilder;
    }
     
    @Override
    public void reduce(CompositeKeyWritableRSJ key, Iterable<Text> values,
    Context context) throws IOException, InterruptedException {
     
    
    for (Text value : values) {
    buildOutputValue(key, reduceValueBuilder, value);
    }
     
    // Drop last comma, set value, and emit output
    if (reduceValueBuilder.length() > 1) {
     
    //reduceValueBuilder.setLength(reduceValueBuilder.length() - 1);
    // Emit output
    reduceOutputValue.set(reduceValueBuilder.toString());
    context.write(nullWritableKey, reduceOutputValue);
    } else {
    System.out.println("Key=" + key.getjoinkey() + "src="
    + key.getsourceindex());
     
    }
    // Reset variables
    reduceValueBuilder.setLength(0);
    reduceOutputValue.set("");
     
    }
    @Override
    protected void cleanup(Context context) throws IOException,
    InterruptedException {
         if(deptMapReader != null)
         {
deptMapReader.close();
    }
    }
}

这是我的 core-site-Xml

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<!-- Put site-specific property overrides in this file. -->

<configuration>
<property>
  <name>hadoop.tmp.dir</name>
  <value>/app/hadoop/tmp</value>
  <description>A base for other temporary directories.</description>
</property>
<property>
  <name>fs.default.name</name>
  <value>hdfs://localhost:9000</value>
  <description>The name of the default file system.  A URI whose
  scheme and authority determine the FileSystem implementation.  The
  uri's scheme determines the config property (fs.SCHEME.impl) naming
  the FileSystem implementation class.  The uri's authority is used to
  determine the host, port, etc. for a filesystem.</description>
</property>
</configuration>

任何帮助将不胜感激。提前致谢!!!


我遇到了同样的问题,我通过添加解决了这个问题

FileSystem fs = FileSystem.get(new URI("hdfs://localhost:9000"),conf)

在驾驶员级别。

你必须导入URIfrom java.net.URI

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

java.lang.IllegalArgumentException:错误的 FS:,预期:hdfs://localhost:9000 的相关文章

  • OSGI - 处理捆绑包所需的第 3 方 JAR

    我刚刚开始 OSGI 开发 正在努力了解如何最好地处理依赖的 JAR 也就是说 如果我要创建一个捆绑包 我很可能需要使用一些第 3 方 JAR 当我创建要部署到 OSGI 的捆绑包 JAR 时 显然不包含这些第 3 方 JAR 因此该捆绑包
  • java中使用awt.Toolkit和Clipboard是否可以知道剪贴板中复制的内容是否是mp3文件

    我正在尝试编写一个运行于背景 and monitors复制 a 的复制操作 mp3 file or a 文件夹包含 a mp3 file Clipboard cb Toolkit getDefaultToolkit getSystemCli
  • 从大表中检索所有记录时如何避免 OOM(内存不足)错误?

    我的任务是将一个巨大的表转换为自定义 XML 文件 我将使用 Java 来完成这项工作 如果我只是发出 SELECT FROM customer 它可能会返回大量数据 最终导致 OOM 我想知道 有没有一种方法可以在记录可用后立即处理该记录
  • 在自动触发的默认侦听器之前触发 Hibernate 自定义事件侦听器

    我创建了一个自定义 Hibernate 事件监听器 扩展了 org hibernate event PreInsertEventListener 自定义侦听器会重写 onPreInsert 方法 并在使用 DAO 将 联系人 实体保存到数据
  • java:查找数组中整数的频率

    我需要开发一个java要求用户输入一些内容的程序integers并找出最大和最小的数 以及这些数的平均值 然后 划分数组的集合分成若干子区间用户指定的 然后它生成一个边界点 每个边界点的长度为子区间宽度 问题是我需要创建一个频率 例如 间隔
  • 如何使用流对 Map 中的值求和?

    我想要与流等效的内容 public static
  • Hibernate/JPA 在启动时不验证数据库架构

    由于某种原因 hibernate 无法捕获诸如将实体映射到不存在的表之类的问题 我的 persistence xml 文件看起来像这样
  • 为什么在步骤中将 TransactionManager 设置为 JPATransactionManager 不正确?

    我正在使用 Spring Batch 和 JPA 并且经历了 TransactionManager bean 冲突 我通过在步骤中将 TransactionManager 设置为 JpaTransactionManager 找到了解决方案
  • HK2 MethodInterceptor 与 Jersey 资源

    如何设置aopMethodInterceptor使用泽西岛资源 这是我尝试过的 如下this https hk2 java net 2 2 0 aop example html文档 第 1 步 拦截服务 public class MyInt
  • 在节点上生成 AES 密钥

    我正在处理一个使用自定义协议来加密通信的遗留应用程序 随机 AES 密钥在旧版 Java 应用程序中生成 如下所示 keygen KeyGenerator getInstance AES keygen init 128 keygen gen
  • 从MySQL php中的特定列获取最大ID和最小ID

    我是新来的php现在尝试从中检索数据MySQL到安卓 这是我的工作细节 table In 检索总小时数函数 我想检索最短 ID 时间 and 最大 ID 超时 from MySQL到安卓通过php最后使用下面的代码来获取总小时数 假设 ID
  • 能够存储微秒的 Date 对象

    我正在寻找一个能够存储到微秒粒度的 Date 对象 有人知道吗 标准Date对象仅存储到毫秒 我知道这是平台限制 我可以通过包装来解决这个问题Date加上自定义类别中的小数数量 然而 我希望避免编写一个带有适当计算等的内容 我需要解析一个b
  • 来自公共字符串的 Android RSA 加密

    我正在开发一个 Android 应用程序 我希望用户能够使用其他人的公钥加密消息 系统将生成公钥 私钥对 然后可以将消息秘密发送给其他用户 我正在创建一个加密类 它将处理消息的加密 解密 不幸的是我遇到了一些问题 在这种方法中 我想传递用户
  • 如何在另一个 Gui 中启动 JADE Gui?

    如何在另一个 Gui 中启动 JADE Gui 假设我的 Gui 上有一个按钮 点击该按钮后 JADE Gui 将启动 这可能吗 如果是 怎么办 提前致谢 Regards 我假设 JADE Gui 你指的是 JADERMA http jad
  • 使用 Android 将文本文件上传到 Google Drive

    编辑 我已将文本设置为字符串 如下所示 字符串文本 你好 我想将其转换为纯文本文件 然后上传到 Google 云端硬盘文件夹 我已经尝试过下面的代码 但它不完整 所以我无法说出出现了什么错误 我正在使用 Google Drive 快速启动
  • 如何使用 Solr 索引 pdf 内容?

    我正在尝试使用 SolrJ 索引一些 pdf 文档 如下所述http wiki apache org solr ContentStreamUpdateRequestExample http wiki apache org solr Cont
  • java.lang.String 无法转换为 org.json.simple.JSONObject simple-json

    我在尝试使用 google 的 simple json 解析简单的 json 时遇到奇怪的问题 这是我的代码 它不起作用 String s args 0 toString JSONObject json JSONObject new JSO
  • 如何在 jformattedtextfield 中仅添加双精度值

    我需要格式化 jformattedtextfield 以便在运行时仅添加具有两位小数的双精度 浮点值 例如 15600 00 请帮我解决这个问题 谢谢 以下是有关如何执行此操作的示例 NumberFormat format DecimalF
  • 如何执行带有参数的命令?

    如何在 Java 中执行带有参数的命令 我试过了 Process p Runtime getRuntime exec new String php var www script php m 2 这是行不通的 String options n
  • 在java中使用共享密钥加密/解密?

    我有客户令牌 我正在从一个 Web 应用程序 如 app1 发送到另一个 Web 应用程序 如 app2 我想加密客户令牌 在 app1 上 并使用在 app1 和 app2 上共享的密钥在 app2 上对其进行解密 我不知道如何开始 这将

随机推荐

  • float 和 double 精度相关的概念

    为什么精度float小数点后最多 6 位 精度double小数点后最多15位 任何人都可以给一个数学解释 of it 说一下精度float or double是一些小数位数是草率的术语 float and double通常使用 IEEE 7
  • 输入具有最小和最大数字的值

    下面是一个输入数字表单 我使用 JavaScript 添加了一些代码 其中可写入的最小数字为 1 最大可写入数字为 50 当有人尝试输入任何小于 1 且大于 50 的数字时 它会自动将其替换为数字 1 或 50 但我没有成功实现此目标 我需
  • 为什么一个简单的 get 语句这么慢?

    几年前 我在学校接到一项作业 必须并行化光线追踪器 这是一项简单的任务 我真的很喜欢做它 今天 我想对光线追踪器进行分析 看看是否可以让它运行得更快 无需完全修改代码 在分析过程中 我注意到一些有趣的事情 Sphere Intersect
  • 使用 rMarkdown 自动生成报告

    我试图在 rMarkdown 中使用相同的模板生成大约 50 份报告 我不想每次都更改输入文件的名称 并且我想为输出文件选择不同的名称 有什么办法可以自动化这个过程吗 谢谢 另一种选择是在单独的 R 脚本中使用 rmarkdown 包的 r
  • Python OpenCV cv.WaitKey 在 Ubuntu 模 256 映射上正确返回奇怪的输出

    我正在使用 OpenCV 2 2 运行 Ubuntu 11 10 Lenovo T400 我相信导入是通过 import cv2 cv as cv 完成的 如果我只是 导入简历 也会发生这个问题 我最近开始遇到这个问题 这有点奇怪 我不知道
  • 使用 EF6(实体框架 6)编写单元测试

    我有一个使用 NET Framework 4 6 1 和 EF6 的 ASP NET Core 项目 现在我想编写一些单元测试 并且已经花了几个小时来配置内存 SQLite 数据库以使用 EF6 但这不起作用 所以 问题是如何使用 EF6
  • 如何在C++中读取一个字节并将字节的ASCII值保存为整数

    我有一个简单的问题让我困惑 Goal 我想从文件中读取给定的字节 比如第一个字节 并用该字节的 ASCII 值创建 int x 因此 例如 如果字节 字符是 a 我希望 x 为 97 十六进制的 61 我有以下读取文件 example tx
  • ANDROID - 在列表视图中获取选定的 id 评级栏

    我试图找出如何获取所选的 idratingBar in ListView在网上 但大多数人都使用ListViewAdapter or RatingAdapter在另一堂课上 我不知道该怎么做 因为我还不知道 所以我所有的课程都在MainAc
  • 通过 IPython 使用 Jython:readline 仍然是一个问题吗?

    我想将 Jython 解释器与 IPython 一起使用 这样我就可以使用制表符补全之类的东西 也许还可以使用 IPython 笔记本 这IPython 常见问题解答网站 http ipython org faq html围绕这是否可行采取
  • React Navigation - setOptions() headerRight 回调中的访问状态

    我凌驾于一切之上反应导航 https reactnavigation org headerRight内部带有自定义按钮的选项React useEffect 当按下按钮时 我需要访问状态name但我得到的值不是当前的值 const name
  • 强制设置核心数据检查点?

    我编写了一个通过 Core Data 搅动大量数据的应用程序 用户在后台退出应用程序后 我会清理这些数据 由于 WAL 检查点似乎是导致 UI 暂停的主要原因 因此我还想强制使用 WAL 检查点 是的 我知道创建第二个核心数据堆栈 这也将完
  • 手动启动 SharePoint 计时器作业

    我想手动调用安装在 SharePoint 服务器上的计时器作业 有用的是类似于 stsadm 命令的东西 我的场景是 我已将具有大量功能的解决方案部署到客户服务器 我不想等待每周的时间表来启动特定的计时器工作 我想输入一个命令来立即运行特定
  • 为什么after_find和after_initialize事件的回调要将它们定义为方法?

    定义 after find 和 after initialize 事件回调的唯一方法是将它们定义为方法 如果您尝试使用第二种技术将它们声明为处理程序 它们将被默默地忽略 有人能解释一下为什么会这样吗 为什么专门针对这两个回调呢 EDIT 摘
  • XML 和 Python:获取根元素中声明的命名空间

    如何访问多个xmlnsXML 树根元素的声明 例如 import xml etree cElementTree as ET data
  • 使用 load_model 加载经过训练的tensorflow.keras模型会返回JSON解码错误,而未经训练的模型加载正常

    我有一个训练有素的 Keras 模型 使用 tensorflow keras API 构建和训练 并使用tf keras save model 没有可选参数的方法 Tensorflow 是最新的 我的 Python 版本是 3 8 根据我的
  • 在 UI 线程上创建并启动任务

    当在工作线程上调用的方法需要在 UI 线程上运行代码并等待其完成后再执行其他操作时 可以这样做 public int RunOnUi Func
  • 每个环境使用不同的 URL 运行相同的 Testcafe 测试

    我正在研究 TestCafe 概念验证 我在一个测试环境中进行了一些测试 我需要一种方法来在最多 3 个具有不同 URL 的不同测试环境中运行相同的测试 对于这种情况有最佳实践吗 解决方案是在 testcafe 命令行上添加自定义选项 例如
  • Delphi 中的 DOMElement

    我如何在 DOMNodeList 对象中使用 getElementsByTagName 喜欢 procedure TForm1 selecionarClick Sender TObject var DOMDocument iXMLDOMDo
  • 如何子类化UIApplication?

    iPhone Reference Libary UIApplication 说我可以子类化 UIApplication 但如果我尝试这样做 我会得到一个异常 Terminating app due to uncaught exception
  • java.lang.IllegalArgumentException:错误的 FS:,预期:hdfs://localhost:9000

    我正在尝试实现reduce side join 并使用mapfile reader来查找分布式缓存 但在stderr中检查时它没有查找值 它显示以下错误 lookupfile文件已经存在于hdfs中 并且似乎已正确加载进入缓存 如标准输出中