MapReduce之KNN算法

2023-11-02

MapReduce之KNN算法

什么是 K K K-邻近算法(KNN)

KNN分类问题是找出一个数据集中与一个给定查询数据点最近的 k k k个数据点。这个操作也称KNN连接。定义为:给定两个数据集 R R R S S S,对于 R R R中的每一个对象,希望从 S S S中找到 k k k个最近的相邻对象。其中 R R R为查询数据集, S S S为训练数据集

KNN分类

KNN的中心思想为建立一个分类方法,使得对于将 y y y(响应变量)与 x x x(预测变量)关联的“平滑“函数 f f f的形式没有任何假设:
     x = ( x 1 , x 2 , … , x n ) x=(x_1,x_2,\dots,x_n) x=(x1,x2,,xn)
     y = f ( x ) y=f(x) y=f(x)
函数 f f f是非参数化的,在KNN中,给定一个新的点 p = ( p 1 , p 2 , … , p n ) p=(p_1,p_2,\dots,p_n) p=(p1,p2,,pn),要动态识别训练数据集中与 p p p相似的 k k k个观察( k k k个邻近)。近邻由一个距离或不相似度来定义。通过计算查询对象与所有训练数据对象之间的欧氏距离,然后将这个查询对象分配到 k k k个最近的训练数据中大多数对象所在的类。因为要计算每个对象之间的距离,所以所有数据类型必须为double。

KNN距离函数

给定如下两个 n n n维对象 X X X Y Y Y
     X = ( X 1 , X 2 , … , X n ) X=(X_1,X_2,\dots,X_n) X=(X1,X2,,Xn)
     Y = ( Y 1 , Y 2 , … , Y n ) Y=(Y_1,Y_2,\dots,Y_n) Y=(Y1,Y2,,Yn)
欧氏距离
   d i s t a n c e ( X , Y ) = ∑ i = 1 n ( X i − Y i ) 2 distance(X,Y)=\sqrt{\sum_{i=1}^n(X_i-Y_i)^2} distance(X,Y)=i=1n(XiYi)2
曼哈顿距离
   d i s t a n c e ( X , Y ) = ∑ i = 1 n ∣ X i − Y i ∣ distance(X,Y)=\sum_{i=1}^n \vert X_i-Y_i \vert distance(X,Y)=i=1nXiYi
闵可夫斯基距离
   ( ∑ i = 1 n ( ∣ X i − Y i ∣ ) q ) 1 / q (\sqrt { \sum_{i=1}^n(\vert X_i-Y_i \vert })^q)^{1/q} (i=1n(XiYi )q)1/q

KNN算法非形式化描述

KNN算法可以总结为以下的简单步骤:

  • 1、确定 k k k k k k取决于具体需求)
  • 2、计算新输入与所有训练数据之间的距离
  • 3、对距离进行排序,并根据第 k k k个最小距离确定 k k k个近邻
  • 4、收集这些近邻所属的类别
  • 5、根据多数投票确定新输入数据类别
MapReduce解决方案

在理解了KNN算法的步骤之后,理解MapReduce方案就简单了,在映射器运行之前将训练集中的数据读取出来,接下来通过计算每条数据与训练集数据中的距离,对距离进行排序,根据多数投票原则确定新输入数据类别,整个操作过程使用映射器即可实现。

输入数据

S.txt文件如下

100;c1;1.0,1.0
101;c1;1.1,1.2
102;c1;1.2,1.0
103;c1;1.6,1.5
104;c1;1.3,1.7
105;c1;2.0,2.1
106;c1;2.0,2.2
107;c1;2.3,2.3
208;c2;9.0,9.0
209;c2;9.1,9.2
210;c2;9.2,9.0
211;c2;10.6,10.5
212;c2;10.3,10.7
213;c2;9.6,9.1
214;c2;9.4,10.4
215;c2;10.3,10.3
300;c3;10.0,1.0
301;c3;10.1,1.2
302;c3;10.2,1.0
303;c3;10.6,1.5
304;c3;10.3,1.7
305;c3;1.0,2.1
306;c3;10.0,2.2
307;c3;10.3,2.3

R.txt文件如下:

1000;3.0,3.0
1001;10.1,3.2
1003;2.7,2.7
1004;5.0,5.0
1005;13.1,2.2
1006;12.7,12.7
mapper阶段任务

这个阶段的主要任务两个:

  • 1、读取训练集中的数据
  • 2、计算训练集数据与输入数据距离并根据投票原则实现分类
mapper阶段编码
package com.deng.KNN;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;
import java.util.List;
import java.util.SortedMap;
import java.util.TreeMap;

public class KNNMapper extends Mapper<LongWritable,Text,Text, Text> {
    private static Text reduceKey;
    private static Text reduceValue;
    private static List<Point> training=null;

    public static List<Point> readTrainingFromHFDS() throws IOException{
        return KNNUtil.readFromHDFS("input/S.txt");
    }
    //从文件系统中读取数据并存入链表中
    public void setup(Context context) throws IOException{
        training=readTrainingFromHFDS();
    }

    public void map(LongWritable key,Text value,Context context){
        String line=value.toString();
        Point query=new Point(line);  //查询数据
        SortedMap<Double,Point> top=new TreeMap<Double, Point>();  //按照距离由小到大存取

        for(int i=0;i<training.size();i++){
            double distance=KNNUtil.calculateEuclidianDistance(query.getVector(),training.get(i).getVector());
            top.put(distance,training.get(i));
            if(top.size()>5){
                top.remove(top.firstKey());
            }
        }
        
        //根据投票原则进行分类,majorityVote为输入数据按照投票原则分类到的祖
        String majorityVote=null;
        int maxCount=0;
        for(Point p:top.values()) {
            p.addCount();
            if (p.getGroupCount() > maxCount) {
                maxCount = p.getGroupCount();
                majorityVote = p.getGroup();
            }
        }

        reduceKey=new Text(query.getGroup());
        reduceValue=new Text(majorityVote);
        try {
            context.write(reduceKey,reduceValue);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

工具类KNNUtil如下
package com.deng.KNN;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Vector;

public class KNNUtil {
    //计算两个输入数据欧氏距离
    public static double calculateEuclidianDistance(Vector<Double> query,Vector<Double> training){
        double sum=0.0;
        for(int i=0;i<query.size();i++){
            sum+=Math.pow(training.get(i)-query.get(i),2);
        }
        return sum;
    }
    //从文件系统中读取数据
    public static List<Point> readFromHDFS(String p) throws IOException{
        BufferedReader br=new BufferedReader(new FileReader(p));
        String str;
        int k=0;
        List<Point> points=new ArrayList<>();

        while((str=br.readLine())!=null){
            Point point=new Point(str);
            System.out.println(point);
            points.add(point);
        }
        br.close();

        return points;
    }

}

自定义类point如下
package com.deng.KNN;

import java.util.Vector;

public class Point {
    private String group;
    private Integer groupCount;
    private Vector<Double> vector=new Vector<>();

    public Point(){}

    public Point(String s){
        // 输入数据中,训练集数据和输入数据输入格式不同,利用长度来进行区分并标记
        String[] line=s.split(";");
        if(line.length==3){
            group=line[1];
            String[] tokens=line[2].split(",");
            for(int i=0;i<tokens.length;i++){
                vector.add(Double.parseDouble(tokens[i]));
            }
        }else{
            group=line[0];
            String[] tokens=line[1].split(",");
            for(int i=0;i<tokens.length;i++){
                vector.add(Double.parseDouble(tokens[i]));
            }
        }
        groupCount=0;
    }

    public String getGroup() {
        return group;
    }

    public Vector<Double> getVector() {
        return vector;
    }

    public Integer getGroupCount() {
        return groupCount;
    }
    //封装加法操作
    public void addCount(){
        this.groupCount++;
    }

    @Override
    public String toString() {
        return "Point{" +
                "group='" + group + '\'' +
                ", vector=" + vector +
                '}';
    }
}

驱动程序如下
package com.deng.KNN;

import com.deng.util.FileUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class KNNDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        FileUtil.deleteDirs("output");
        String[] otherArgs=new String[]{"input/R.txt","output"};
        Configuration conf=new Configuration();
        Job job=new Job(conf,"KNN");
        job.setJarByClass(KNNDriver.class);
        job.setMapperClass(KNNMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        job.setNumReduceTasks(0);
        FileInputFormat.addInputPath(job,new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job,new Path(otherArgs[1]));
        System.exit((job.waitForCompletion(true)?0:1));
    }
}

运行结果如下

在这里插入图片描述

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

MapReduce之KNN算法 的相关文章

  • 将 MapReduce 作业的输出记录到文本文件

    我一直在使用这个 jobclient monitorandprintjob 方法将映射缩减作业的输出打印到控制台 我的用法是这样的 job client monitorAndPrintJob job conf job client getJ
  • 解析数百万个小 XML 文件

    我有 1000 万个小 XML 文件 300KB 500KB 我在 Mapreduce 中使用 Mahaout 的 XML 输入格式来读取数据 并使用 SAX 解析器进行解析 但处理速度非常慢 使用输入文件的压缩 lzo 有助于提高性能吗
  • 在spark中设置textinputformat.record.delimiter

    在 Spark 中 可以设置一些 hadoop 配置设置 例如 System setProperty spark hadoop dfs replication 1 这有效 复制因子设置为 1 假设是这种情况 我认为这种模式 在常规 hado
  • Apache Pig:无法运行我自己的pig.jar 和pig-withouthadoop.jar

    我有一个运行 Hadoop 0 20 2 和 Pig 0 10 的集群 我有兴趣向 Pig 的源代码添加一些日志 并在集群上运行我自己的 Pig 版本 我做了什么 使用 ant 命令构建项目 有pig jar和pig without had
  • 我如何调试 Hadoop MapReduce [重复]

    这个问题在这里已经有答案了 我正在尝试构建一个地图缩减作业 它运行完成 但最后呈现奇怪的数据 当我尝试使用 system out println debug data 调试它时 它没有显示在屏幕上 使用 java API 生成外部日志文件
  • 我的 cdh5.2 集群在运行 hbase MR 作业时出现 FileNotFoundException

    我的 cdh5 2 集群运行 hbase MR 作业时出现问题 例如 我将 hbase 类路径添加到 hadoop 类路径中 vi etc hadoop conf hadoop env sh 添加行 export HADOOP CLASSP
  • Hadoop MapReduce:可以在一个 hadoop 作业类中定义两个映射器和缩减器吗?

    我有两个独立的 java 类 用于执行两个不同的 MapReduce 作业 我可以独立运行它们 对于这两个作业 它们所操作的输入文件是相同的 所以我的问题是是否可以在一个java类中定义两个映射器和两个缩减器 例如 mapper1 clas
  • Hadoop MapReduce 提供嵌套目录作为作业输入

    我正在从事一项处理嵌套目录结构的工作 其中包含多个级别的文件 one three four baz txt bleh txt foo txt two bar txt gaa txt 当我添加one 作为输入路径 不会处理任何文件 因为没有文
  • MongoDB 从两个数组计算值、排序和限制

    我有一个存储浮点数组的 MongoDB 数据库 假设以下格式的文档集合 id 0 vals 0 8 0 2 0 5 有一个查询数组 例如 带有值 0 1 0 3 0 4 我想计算集合中所有元素的距离 例如 差异之和 对于给定的文档和查询 它
  • PHP MongoDB映射减少数据库断言失败

    我第一次使用 PHP MongoDB 进行 Map Reduce 运行 MapReduce 命令时遇到错误 My code map function emit this topic id re date this date posted r
  • 为什么 Hadoop 中正确的缩减数量是 0.95 或 1.75?

    hadoop 文档指出 正确的归约次数似乎是 0 95 或 1 75 乘以 mapred tasktracker reduce tasks maximum 0 95 所有的减少都可以立即启动并开始 地图完成时传输地图输出 用1 75更快 节
  • 使用 MongoDB 的 MapReduce 选择不同的多个字段

    我想在 MongoDB 上执行这个 SQL 语句 SELECT DISTINCT book author from library 到目前为止 MongoDB 的 DISTINCT 一次仅支持一个字段 对于多个字段 我们必须使用 GROUP
  • Hadoop:Reducer 将 Mapper 输出写入输出文件

    我遇到了一个非常非常奇怪的问题 减速器确实可以工作 但是如果我检查输出文件 我只找到了映射器的输出 当我尝试调试时 在将映射器的输出值类型从 Longwritable 更改为 Text 后 我 发现字数示例存在相同的问题 package o
  • 使用 CouchDB 视图替换 SQL 中的多个联接

    我正在为我的应用程序实现过滤功能 但在 CouchDB 上编写视图时遇到问题 在 SQL 中 这将是一个具有多个连接的语句 如何替换 CouchDB 中的多重连接 本文涵盖单连接 http www cmlenz net archives 2
  • Spark 无法再执行作业。执行器创建目录失败

    我们已经有一个小型 Spark 集群运行了一个月 它已经成功执行了作业 或者让我为该集群启动一个 Spark shell 无论我向集群提交作业还是使用 shell 连接到集群 错误总是相同的 root SPARK HOME bin spar
  • CouchDB“加入”两个文档

    我有两个看起来有点像这样的文档 Doc id AAA creator id data DataKey id credits left 500 times used 0 data id AAA 我想要做的是创建一个视图 它允许我传递 Data
  • java.lang.IllegalArgumentException:错误的 FS:,预期:hdfs://localhost:9000

    我正在尝试实现reduce side join 并使用mapfile reader来查找分布式缓存 但在stderr中检查时它没有查找值 它显示以下错误 lookupfile文件已经存在于hdfs中 并且似乎已正确加载进入缓存 如标准输出中
  • 将多个前缀行过滤器设置为扫描仪 hbase java

    我想创建一台扫描仪 它可以为我提供带有 2 个前缀过滤器的结果例如 我想要其键以字符串 x 开头或以字符串 y 开头的所有行 目前我知道只能使用一个前缀 方法如下 scan setRowPrefixFilter prefixFiltet 在
  • 适用于 Hadoop 的 DynamoDB 输入格式

    我必须使用 Hadoop mapreduce 处理保留在 Amazon Dynamodb 中的一些数据 我在互联网上搜索 Dynamo DB 的 Hadoop InputFormat 但找不到它 我对 Dynamo DB 不熟悉 所以我猜测
  • Sqoop - 绑定到 YARN 队列

    因此 使用 MapReduce v2 您可以使用绑定到某些 YARN 队列来管理资源和优先级 基本上通过使用 hadoop jar xyz jar D mapreduce job queuename QUEUE1 input output

随机推荐