MapReduce编程-join算法实现

2023-05-16

假设有订单表t_order和t_product两张数据库表,现在需要进行关联查询。这样的sql语句很容易写

select  a.id,a.date,b.name,b.category_id,b.price 
from t_order a left out join t_product b 
on a.pid = b.id

那么怎么样用mapreduce来实现呢?通过将关联的条件作为map输出的key(pid,商品id,订单表与商品表是多对一关系),将两表满足join条件的数据并携带数据所来源的文件信息,发往同一个reduce task,在reduce中进行数据的串联,最后写到一个文件中。

我们可以自定义一个Bean,里面封装了两张表的所有字段信息,最后写出到文件的时候将bean输出即可。

public class InfoBean implements Writable{

    private String oid;//订单id
    private String date;
    private String pid;//商品id
    private int amount;
    private String pname;//商品名称
    private int category_id;//商品类别
    private int price;

    //0:订单信息        1:商品信息
    private int flag;
    //序列化方法
    public void write(DataOutput out) throws IOException {
        out.writeUTF(oid);
        out.writeUTF(date);
        out.writeUTF(pid);
        out.writeInt(amount);
        out.writeUTF(pname);
        out.writeInt(category_id);
        out.writeInt(price);
        out.writeInt(flag);
    }
    //反序列化方法
    public void readFields(DataInput in) throws IOException {
        this.oid=in.readUTF();
        this.date=in.readUTF();
        this.pid=in.readUTF();
        this.amount=in.readInt();
        this.pname=in.readUTF();
        this.category_id=in.readInt();
        this.price=in.readInt();
        this.flag=in.readInt();
    }
    //初始化属性
    public void setInfoBean(String oid, String date, String pid, int amount, String pname, int category_id, int price,
            int flag) {
        this.oid = oid;
        this.date = date;
        this.pid = pid;
        this.amount = amount;
        this.pname = pname;
        this.category_id = category_id;
        this.price = price;
        this.flag = flag;
    }
    //重写toString方法,以便文件中展示
    @Override
    public String toString() {
        return "oid=" + oid + ", date=" + date + ", pid=" + pid + ", amount=" + amount + ", pname=" + pname
                + ", category_id=" + category_id + ", price=" + price ;
    }
    //序列化必须有无参构造方法
    public InfoBean() {
    }
    getset
}
public class MapReduceJoin {
    static class MapReduceJoinMapper extends Mapper<LongWritable, Text, Text, InfoBean>{
        InfoBean bean = new InfoBean();
        Text text = new Text();
        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            String line = value.toString();
            //获取切片
            FileSplit inputSplit = (FileSplit)context.getInputSplit();
            //获取文件名,根据文件名进行不同的处理
            //订单文件名order.txt,商品文件名product.txt
            String name = inputSplit.getPath().getName();
            String pid = "";
            if(name.startsWith("order")){
                String[] fields = line.split(",");
                pid = fields[2];
                //订单文件
                bean.setInfoBean(fields[0], fields[1], pid, 
                        Integer.parseInt(fields[3]), "", 0, 0, 0);
            }else {
                String[] fields = line.split(",");
                pid = fields[0];
                //商品文件
                bean.setInfoBean("", "", pid, 
                        0, fields[1], Integer.parseInt(fields[2]), Integer.parseInt(fields[3]), 1);
            }
            text.set(pid);
            context.write(text, bean);
        }
    }
    static class MapReduceJoinReducer extends Reducer<Text, InfoBean, InfoBean, NullWritable>{
        //传来的一组信息中只会有一个商品bean可能有多个订单bean
        @Override
        protected void reduce(Text key, Iterable<InfoBean> beans,
                Context context) throws IOException, InterruptedException {
            //用来存储唯一的商品bean信息
            InfoBean ProductBean = new InfoBean();
            //用来存储多个订单信息的集合
            List<InfoBean> orderList = new ArrayList<InfoBean>();
            for (InfoBean bean : beans) {
                //判断是订单还是商品信息
                int flag = bean.getFlag();
                if(flag == 1){
                    //商品信息,只能是一个
                    try {
                        BeanUtils.copyProperties(ProductBean, bean);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }else {
                    //订单信息,可能是多个
                    InfoBean orderBean = new InfoBean();
                    try {
                        BeanUtils.copyProperties(orderBean, bean);
                        orderList.add(orderBean);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
            for (InfoBean orderBean : orderList) {
                //多个订单信息中设置好对应的商品信息
                orderBean.setPname(ProductBean.getPname());
                orderBean.setCategory_id(ProductBean.getCategory_id());
                orderBean.setPrice(ProductBean.getPrice());
                context.write(orderBean, NullWritable.get());
            }
        }
    }
    public static void main(String[] args) throws Exception{
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        job.setJarByClass(MapReduceJoin.class);
        //指定本业务job要使用的mapper,reducer业务类
        job.setMapperClass(MapReduceJoinMapper.class);
        job.setReducerClass(MapReduceJoinReducer.class);
        //虽然指定了泛型,以防框架使用第三方的类型
        //指定mapper输出数据的kv类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(InfoBean.class);

        //指定最终输出的数据的kv类型
        job.setOutputKeyClass(InfoBean.class);
        job.setOutputValueClass(NullWritable.class);

        //指定job输入原始文件所在位置
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        //指定job输入原始文件所在位置
        FileOutputFormat.setOutputPath(job,new Path(args[1]));
        //将job中配置的相关参数以及job所用的java类所在的jar包,提交给yarn去运行
        boolean b = job.waitForCompletion(true);
        System.exit(b?0:1);
    }
}

测试:将工程打包上传到linux
创建输入目录
创建订单文件和商品文件编辑字段信息
将文件传到输入目录
执行程序
查看生产文件的内容

[root@mini1 ~]# hadoop fs -mkdir -p /mrjoin/input
[root@mini1 ~]# vi order.txt
1001,20170710,P0001,1
1002,20170710,P0001,3
1003,20170710,P0002,3
1003,20170710,P0002,4
[root@mini1 ~]# vi product.txt 
P0001,xiaomi4,1000,2
P0002,iphone6s,1000,3
[root@mini1 ~]# hadoop fs -put order.txt product.txt /mrjoin/input/
[root@mini1 ~]# hadoop jar mrjoin.jar com.scu.hadoop.rjoin.MapReduceJoin /mrjoin/input /mrjoin/output
[root@mini1 ~]# hadoop fs -cat /mrjoin/output/part-r-00000
oid=1002, date=20170710, pid=P0001, amount=3, pname=xiaomi4, category_id=1000, price=2
oid=1001, date=20170710, pid=P0001, amount=1, pname=xiaomi4, category_id=1000, price=2
oid=1003, date=20170710, pid=P0002, amount=4, pname=iphone6s, category_id=1000, price=3
oid=1003, date=20170710, pid=P0002, amount=3, pname=iphone6s, category_id=1000, price=3
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

MapReduce编程-join算法实现 的相关文章

  • RFC8314文档中对465端口和587端口的阐述

    最近在学习SMTP的时候发现SMTP在使用加密传输的时候涉及到465和587两个端口 xff0c 网上对两者之间的区别众说纷纭 xff0c 后来查到了RFC官方文档中对于这个争论较久的问题的定义和详细说明 xff0c 这里做转载和翻译用于记
  • nginx篇08-添加客户端证书认证

    本文主要介绍如何使用给nginx服务添加客户端证书认证从而实现双向加密 对于一般的https网站来说 xff0c 实际上https所使用的证书是属于单向验证 xff0c 即客户端单向验证服务器的安全性 xff0c 而服务器端是没有对客户端的
  • Linux 查找搜索命令 5种方式

    一 whereis命令 该指令会在特定目录中查找符合条件的文件 这些文件应属于原始代码 二进制文件 xff0c 或是帮助文件 该指令只能用于查找二进制文件 源代码文件和man手册页 xff0c 一般文件的定位需使用locate命令 简单理解
  • SUMO学习

    SUMO学习 SUMO简介1 车道模型2 跟驰模型跟驰模型CACC 3 变道模型1 Strategic change 战略变道2 Cooperative change 协同变道3 Tactical change 战术变道4 Obligato
  • 51单片机学习笔记4 -- 蜂鸣器控制

    蜂鸣器控制 1 蜂鸣器简介1 分类2 有源蜂鸣器和无源蜂鸣器3 区分有源蜂鸣器和无源蜂鸣器4 蜂鸣器驱动电路 2 电路图绘制3 蜂鸣器控制4 程序补充 1 蜂鸣器简介 蜂鸣器是一种一体化结构的电子讯响器 xff0c 采用直流电压供电 xff
  • 树莓派3B+安装系统,配置基本环境、更换国内镜像源,适用pi4

    树莓派3B 43 安装系统 系统镜像下载 树莓派官方镜像下载地址 xff1a 自行百度 xff0c 官方网站首页 xff0c 点击Downloads 安装镜像 准备一张8G以上的内存卡 xff0c 推荐16G以上 下载系统制作软件etche
  • Linux超强截图工具flameshot

    Pop OS自带的截屏快捷键如下 但讲道理这个是真的不好用 所以我们借助第三方的截图工具 xff0c 这里推荐flameshot 火焰截图 在终端键入以下命令即可安装 span class token function sudo span
  • 5/1-3 BP神经网络的改进及MATLAB实现(下)

    5 1 3 BP神经网络的改进及MATLAB实现 xff08 下 xff09 文章目录 5 1 3 BP神经网络的改进及MATLAB实现 xff08 下 xff09 1 自适应梯度下降法 xff08 Adagrad xff09 2 动量法
  • “操作无法完成因为其中的文件夹或文件已在另一个程序中打开”解决办法

    在windows系统中 xff0c 我们经常会遇到这样一个问题 xff1a 删除某一个文件或者文件夹 xff0c 被提醒 xff1a 操作无法完成 xff0c 因为其中的文件夹或文件已在另一个程序中打开 这个时候我们一般会先检查是否真的有程
  • windows下N卡提取或者刷VBIOS

    windows下N卡提取或者刷VBIOS 这里是简介 xff1a 在windows下怎么提取出显卡的VBIOS和刷VBIOS引用 本文已 GeForce MX450显卡为例 xff0c 需要借助工具 lt 1 gt 工具介绍 xff08 N
  • 论文:Threat of Adversarial Attacks on Deep Learning in Computer Vision: A Survey翻译工作

    关于对抗性攻击对深度学习威胁的研究 Naveed Akhtar and Ajmal Mian ACKNOWLEDGEMENTS The authors thank Nicholas Carlini UC Berkeley and Dimit
  • 802.1x认证方式(EAP中继认证与EAP终结认证)

    文章目录 1 前言2 协议说明3 报文分析EAP中继模式 MD5 challengeEAP终结模式 MD5 challengeRadius CHAP认证原理Message Authenticator消息验证器计算参考连接 xff1a 1 前
  • ubuntu 换源深层次解析

    换源也是一个容易出错的问题 xff0c 本文以树莓派为例展开 xff0c x86也是一样的操作 那么假设成立的话 xff0c 就要记住我们是在树莓派 xff08 arm xff09 上安装的ubuntu xff0c 不是X86 xff0c
  • MySql数据库查询(一)——单表查询

    1 查询所有字段 SELECT FROM 表名称 xff1b 例如查询book表中所有的数据 xff1a select from book 2 查询指定字段 SELECT 字段1 xff0c 字段2 xff0c 字段3 FROM 表名称 x
  • Linux系统安装Anaconda

    本文软件信息 xff1a 系统 xff1a RHEL8软件 xff1a Anaconda3 2022 10 Linux x86 64 sh Linux安装Anaconda的步骤都一样 xff0c 没啥差别 下载Anaconda的安装包 在官
  • 基于TensorFlow的VGG16模型源码

    我看了网上的一些源码程序 xff0c 自己下载跑一哈 xff0c 发现有很多的错误 xff0c 不知道是我电脑原因 xff0c 还是tensorflow版本问题 xff0c 我自己基于别人的源码修改了一些细节 xff0c 使程序可以顺利运行
  • 【vscode】c++使用vector报错ERROR: Unable to start debugging. Unexpected GDB output from command “-exec-run

    完整报错是在DEBUG CONSOLE ERROR Unable to start debugging Unexpected GDB output from command 34 exec run 34 During startup pro
  • Maven依赖改为Gradle写法

    Maven写法 lt dependency gt lt groupId gt org apache poi lt groupId gt lt artifactId gt poi lt artifactId gt lt version gt
  • 构造方法

    构造方法可以重载 xff08 可以直接给方法变量进行赋值 xff0c 方便些 xff0c 减少setter xff0c getter方法的使用 xff0c 但不是说setter xff0c getter方法没用 xff09 注意事项 xff
  • Java项目--书评网信息系统

    1 项目背景 在学习完ssm相关知识后 xff0c 有了基础能力就想通过完成一个javaweb项目来巩固自己所学知识以及在具体开发过程中找出自己的不足因此便完成了书评网信息系统 2 项目功能 手机端网站 首页 书籍详情页 评论区 登陆界面

随机推荐