Java MapReduce 按日期计数

2024-01-07

我是 Hadoop 的新手,我正在尝试编写一个 MapReduce 程序,以按日期(按月分组)计算最多出现的前两次选集。所以我的输入是这样的:

2017-06-01 , A, B, A, C, B, E, F 
2017-06-02 , Q, B, Q, F, K, E, F
2017-06-03 , A, B, A, R, T, E, E 
2017-07-01 , A, B, A, C, B, E, F
2017-07-05 , A, B, A, G, B, G, G

所以,我期待这个 MapReduce 程序的结果,比如:

2017-06,  A:4, E:4
2017-07,  A:4, B:4
public class ArrayGiulioTest {

    public static Logger logger = Logger.getLogger(ArrayGiulioTest.class);

    public static class CustomMap extends Mapper<LongWritable, Text, Text, TextWritable> {
        private Text word = new Text();

        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

            TextWritable array = new TextWritable();
            String line = value.toString();
            StringTokenizer tokenizer = new StringTokenizer(line, ",");
            String dataAttuale = tokenizer.nextToken().substring(0,
                    line.lastIndexOf("-"));

            Text tmp = null;
            Text[] tmpArray = new Text[tokenizer.countTokens()];
            int i = 0;
            while (tokenizer.hasMoreTokens()) {
                String prod = tokenizer.nextToken(",");

                word.set(dataAttuale);
                tmp = new Text(prod);
                tmpArray[i] = tmp;

                i++;
            }

            array.set(tmpArray);

            context.write(word, array);

        }
    }

    public static class CustomReduce extends Reducer<Text, TextWritable, Text, Text> {


        public void reduce(Text key, Iterator<TextWritable> values,
                Context context) throws IOException, InterruptedException {

            MapWritable map = new MapWritable();
            Text txt = new Text();

            while (values.hasNext()) {
                TextWritable array = values.next();
                Text[] tmpArray = (Text[]) array.toArray();
                for(Text t : tmpArray) {
                    if(map.get(t)!= null) {
                        IntWritable val = (IntWritable) map.get(t);
                        map.put(t, new IntWritable(val.get()+1));
                    } else {
                        map.put(t, new IntWritable(1));
                    }
                }

            }

            Set<Writable> set = map.keySet();
            StringBuffer str = new StringBuffer();
            for(Writable k : set) {

                str.append("key: " + k.toString() + " value: " + map.get(k) + "**");
            }
            txt.set(str.toString());


            context.write(key, txt);
        }
    }

    public static void main(String[] args) throws Exception {
        long inizio = System.currentTimeMillis();
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "countProduct");
        job.setJarByClass(ArrayGiulioTest.class);

        job.setMapperClass(CustomMap.class);
        //job.setCombinerClass(CustomReduce.class);
        job.setReducerClass(CustomReduce.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(TextWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        job.waitForCompletion(true);
        long fine = System.currentTimeMillis();
        logger.info("**************************************End" + (End-Start));
        System.exit(1);
    }

}

我已经以这种方式实现了我的自定义 TextWritable :

public class TextWritable extends ArrayWritable {


    public TextWritable() {
        super(Text.class);
    }
}

..所以当我运行我的 MapReduce 程序时我得到了这样的结果

2017-6    wordcount.TextWritable@3e960865
2017-6    wordcount.TextWritable@3e960865

很明显我的减速机不起作用。这似乎是我的映射器的输出

任何想法?有人可以说这是否是解决问题的正确途径?

这里是控制台日志(仅供参考,我的输入文件有 6 行而不是 5 行) *我在 eclipse(mono JVM) 下启动 MapReduce 问题或使用 Hadoop 和 Hdfs 获得相同的结果

File System Counters
    FILE: Number of bytes read=1216
    FILE: Number of bytes written=431465
    FILE: Number of read operations=0
    FILE: Number of large read operations=0
    FILE: Number of write operations=0
Map-Reduce Framework
    Map input records=6
    Map output records=6
    Map output bytes=214
    Map output materialized bytes=232
    Input split bytes=97
    Combine input records=0
    Combine output records=0
    Reduce input groups=3
    Reduce shuffle bytes=232
    Reduce input records=6
    Reduce output records=6
    Spilled Records=12
    Shuffled Maps =1
    Failed Shuffles=0
    Merged Map outputs=1
    GC time elapsed (ms)=0
    Total committed heap usage (bytes)=394264576
Shuffle Errors
    BAD_ID=0
    CONNECTION=0
    IO_ERROR=0
    WRONG_LENGTH=0
    WRONG_MAP=0
    WRONG_REDUCE=0
File Input Format Counters 
    Bytes Read=208
File Output Format Counters 
    Bytes Written=1813

我认为你试图在映射器中做太多的工作。您只需要对日期进行分组(看起来您无论如何都没有根据预期的输出正确地格式化它们)。

例如,以下方法将改变这些线

2017-07-01 , A, B, A, C, B, E, F
2017-07-05 , A, B, A, G, B, G, G

进入这对减速机

2017-07 , ("A,B,A,C,B,E,F", "A,B,A,G,B,G,G")

换句话说,您不会通过使用ArrayWritable,只需将其保留为文本即可。


所以,映射器看起来像这样

class CustomMap extends Mapper<LongWritable, Text, Text, Text> {

    private final Text key = new Text();
    private final Text output = new Text();

    @Override
    protected void map(LongWritable offset, Text value, Context context) throws IOException, InterruptedException {

        int separatorIndex = value.find(",");

        final String valueStr = value.toString();
        if (separatorIndex < 0) {
            System.err.printf("mapper: not enough records for %s", valueStr);
            return;
        }
        String dateKey = valueStr.substring(0, separatorIndex).trim();
        String tokens = valueStr.substring(1 + separatorIndex).trim().replaceAll("\\p{Space}", "");

        SimpleDateFormat fmtFrom = new SimpleDateFormat("yyyy-MM-dd");
        SimpleDateFormat fmtTo = new SimpleDateFormat("yyyy-MM");

        try {
            dateKey = fmtTo.format(fmtFrom.parse(dateKey));
            key.set(dateKey);
        } catch (ParseException ex) {
            System.err.printf("mapper: invalid key format %s", dateKey);
            return;
        }

        output.set(tokens);
        context.write(key, output);
    }
}

然后reducer可以构建一个Map来收集值字符串中的值并对其进行计数。再次,只写出文本。

class CustomReduce extends Reducer<Text, Text, Text, Text> {

    private final Text output = new Text();

    @Override
    protected void reduce(Text date, Iterable<Text> values, Context context) throws IOException, InterruptedException {

        Map<String, Integer> keyMap = new TreeMap<>();
        for (Text v : values) {
            String[] keys = v.toString().trim().split(",");

            for (String key : keys) {
                if (!keyMap.containsKey(key)) {
                    keyMap.put(key, 0);
                }
                keyMap.put(key, 1 + keyMap.get(key));
            }
        }

        output.set(mapToString(keyMap));
        context.write(date, output);
    }

    private String mapToString(Map<String, Integer> map) {
        StringBuilder sb = new StringBuilder();
        String delimiter = ", ";
        for (Map.Entry<String, Integer> entry : map.entrySet()) {
            sb.append(
                    String.format("%s:%d", entry.getKey(), entry.getValue())
            ).append(delimiter);
        }
        sb.setLength(sb.length()-delimiter.length());
        return sb.toString();
    }
}

根据您的输入,我得到了这个

2017-06 A:4, B:4, C:1, E:4, F:3, K:1, Q:2, R:1, T:1
2017-07 A:4, B:4, C:1, E:1, F:1, G:3
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Java MapReduce 按日期计数 的相关文章

  • 我是否需要安装 SQLite 才能使 SQLiteJDBC 正常工作?

    我想我只是没有 明白 如果我的计算机上尚未安装 SQLite 并且我想编写一个使用嵌入式数据库的 Java 应用程序 并且我将 SQLiteJDBC JAR 下载 导入到我的项目中 那么这就是我所需要的吗 或者 我是否需要先安装 SQLit
  • Netbeans 8.1 Gnome 3 GTK+ UI 字体和选项卡高度

    我刚刚在运行 GNOME 3 桌面的 Ubuntu 16 04 上安装了 NetBeans 8 1 如果可能的话 我想继续使用 IDE 的 GTK 外观和感觉 但 UI 上的字体 尤其是选项卡中的字体 太小且重叠 我尝试添加 fontsiz
  • 带路径压缩算法的加权 Quick-Union

    有一种 带路径压缩的加权快速联合 算法 代码 public class WeightedQU private int id private int iz public WeightedQU int N id new int N iz new
  • 垃圾收集器如何在幕后工作来收集死对象?

    我正在阅读有关垃圾收集的内容 众所周知 垃圾收集会收集死亡对象并回收内存 我的问题是 Collector 如何知道任何对象已死亡 它使用什么数据结构来跟踪活动对象 我正在研究这个问题 我发现GC实际上会跟踪活动对象 并标记它们 每个未标记的
  • 什么是抽象类? [复制]

    这个问题在这里已经有答案了 当我了解抽象类时 我说 WT H 问题 创建一个无法实例化的类有什么意义呢 为什么有人想要这样的课程 什么情况下需要抽象类 如果你明白我的意思 最常见的是用作基类或接口 某些语言有单独的interface构建 有
  • 如何在 Java 中向时间戳添加/减去时区偏移量?

    我正在使用 JDK 8 并且玩过ZonedDateTime and Timestamp很多 但我仍然无法解决我面临的问题 假设我得到了格式化的Timestamp在格林威治标准时间 UTC 我的服务器位于某处 假设它设置为Asia Calcu
  • 在 Java 中如何找出哪个对象打开了文件?

    我需要找出答案哪个对象在我的 Java 应用程序中打开了一个文件 这是为了调试 因此欢迎使用工具或实用程序 如果发现哪个对象太具体了 这class也会很有帮助 这可能很棘手 您可以从使用分析器开始 例如VisualVM http visua
  • 断言 Kafka 发送有效

    我正在使用 Spring Boot 编写一个应用程序 因此要写信给 Kafka 我这样做 Autowired private KafkaTemplate
  • 将人类日期(当地时间 GMT)转​​换为日期

    我正在服务器上工作 服务器正在向我发送 GMT 本地日期的日期 例如Fri Jun 22 09 29 29 NPT 2018在字符串格式上 我将其转换为日期 如下所示 SimpleDateFormat simpleDateFormat ne
  • 如何在.NET中使用java.util.zip.Deflater解压缩放气流?

    之后我有一个转储java util zip Deflater 可以确认它是有效的 因为 Java 的Inflater打开它很好 并且需要在 NET中打开它 byte content ReadSample sampleName var inp
  • 蓝牙发送和接收文本数据

    我是 Android 开发新手 我想制作一个使用蓝牙发送和接收文本的应用程序 我得到了有关发送文本的所有内容逻辑工作 但是当我尝试在手机中测试它时 我看不到界面 这是Main Activity Code import android sup
  • 如何在JPanel中设置背景图片

    你好 我使用 JPanel 作为我的框架的容器 然后我真的想在我的面板中使用背景图片 我真的需要帮助 这是我到目前为止的代码 这是更新 请检查这里是我的代码 import java awt import javax swing import
  • 使用 Elastic Beanstalk 进行 Logback

    我在使用 Elastic Beanstalk 记录应用程序日志时遇到问题 我正在 AWS Elastic Beanstalk 上的 Tomcat 8 5 with Corretto 11 running on 64bit Amazon Li
  • 如何区分从 Saxon XPathSelector 返回的属性节点和元素节点

    给定 XML
  • 为什么\0在java中不同系统中打印不同的输出

    下面的代码在不同的系统中打印不同的输出 String s hello vsrd replace 0 System out println s 当我在我的系统中尝试时 Linux Ubuntu Netbeans 7 1 它打印 When I
  • 使用 HtmlUnit 定位弹出窗口

    我正在构建一个登录网站并抓取一些数据的程序 登录表单是一个弹出窗口 所以我需要访问这个www betexplorer com网站 在页面的右上角有一个登录链接 写着 登录 我单击该链接 然后出现登录弹出表单 我能够找到顶部的登录链接 但找不
  • 将 JScrollPane 添加到 JFrame

    我有一个关于向 Java 框架添加组件的问题 我有一个带有两个按钮的 JPanel 和一个添加了 JTable 的 JScrollPane 我想将这两个添加到 JFrame 中 我可以将 JPanel 添加到 JFrame 或将 JScro
  • java 中的蓝牙 (J2SE)

    我是蓝牙新手 这就是我想做的事情 我想获取连接到我的电脑上的蓝牙的设备信息并将该信息写入文件中 我应该使用哪个 api 以及如何实现 我遇到了 bluecove 但经过几次搜索 我发现 bluecove 不能在 64 位电脑上运行 我现在应
  • MiniDFSCluster UnsatisfiedLinkError org.apache.hadoop.io.nativeio.NativeIO$Windows.access0

    做时 new MiniDFSCluster Builder config build 我得到这个异常 java lang UnsatisfiedLinkError org apache hadoop io nativeio NativeIO
  • 由 Servlet 容器提供服务的 WebSocket

    上周我研究了 WebSockets 并对如何使用 Java Servlet API 实现服务器端进行了一些思考 我没有花费太多时间 但在使用 Tomcat 进行一些测试时遇到了以下问题 如果不修补容器或至少对 HttpServletResp

随机推荐