一张表相当小、一张表一点都不小,一张表相当小、一张表非常的大

本文是MapReduce系列第二篇,本文是MapReduce系列第二篇

上一篇《MapReduce多样join完成实例分析(一)》,我们能够点击回看该篇作品。本文是MapReduce连串第一篇。

上一篇《MapReduce多种join实现实例分析(一)》,我们能够点击回想该篇小说。本文是MapReduce连串第壹篇。

一、在Map端实行连接
使用景况:一张表十分的小、一张表相当的大。
用法:在付给作业的时候先将小表文件放到该学业的DistributedCache中,然后从DistributeCache中取出该小表进行join
key / value解释分割放到内部存储器中(可以放大Hash
Map等等容器中)。然后扫描大表,看大表中的每条记下的join key
/value值是或不是能够在内部存款和储蓄器中找到同样join key的记录,倘若有则间接出口结果。
一直上代码,比较简单:

壹、在Map端举行连接
接纳处境:一张表非常的小、一张表不小。
用法:在交付作业的时候先将小表文件放到该学业的DistributedCache中,然后从DistributeCache中取出该小表实行join
key / value解释分割放到内部存储器中(可以放大Hash
Map等等容器中)。然后扫描大表,看大表中的每条记下的join key
/value值是不是能够在内部存款和储蓄器中找到同样join key的记录,要是有则向来出口结果。
直接上代码,相比较不难:

package com.mr.mapSideJoin;   
import java.io.BufferedReader;   
import java.io.FileReader;   
import java.io.IOException;   
import java.util.HashMap;   
import org.apache.hadoop.conf.Configuration;   
import org.apache.hadoop.conf.Configured;   
import org.apache.hadoop.filecache.DistributedCache;   
import org.apache.hadoop.fs.Path;   
import org.apache.hadoop.io.Text;   
import org.apache.hadoop.mapreduce.Job;   
import org.apache.hadoop.mapreduce.Mapper;   
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;   
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;   
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;   
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;   
import org.apache.hadoop.util.Tool;   
import org.apache.hadoop.util.ToolRunner;   
import org.slf4j.Logger;   
import org.slf4j.LoggerFactory;   
/** 
 * @author zengzhaozheng 
 * 
 * 用途说明: 
 * Map side join中的left outer join 
 * 左连接,两个文件分别代表2个表,连接字段table1的id字段和table2的cityID字段 
 * table1(左表):tb_dim_city(id int,name string,orderid int,city_code,is_show), 
 * 假设tb_dim_city文件记录数很少,tb_dim_city.dat文件内容,分隔符为"|": 
 * id     name  orderid  city_code  is_show 
 * 0       其他        9999     9999         0 
 * 1       长春        1        901          1 
 * 2       吉林        2        902          1 
 * 3       四平        3        903          1 
 * 4       松原        4        904          1 
 * 5       通化        5        905          1 
 * 6       辽源        6        906          1 
 * 7       白城        7        907          1 
 * 8       白山        8        908          1 
 * 9       延吉        9        909          1 
 * -------------------------风骚的分割线------------------------------- 
 * table2(右表):tb_user_profiles(userID int,userName string,network string,double flow,cityID int) 
 * tb_user_profiles.dat文件内容,分隔符为"|": 
 * userID   network     flow    cityID 
 * 1           2G       123      1 
 * 2           3G       333      2 
 * 3           3G       555      1 
 * 4           2G       777      3 
 * 5           3G       666      4 
 * -------------------------风骚的分割线------------------------------- 
 *  结果: 
 *  1   长春  1   901 1   1   2G  123 
 *  1   长春  1   901 1   3   3G  555 
 *  2   吉林  2   902 1   2   3G  333 
 *  3   四平  3   903 1   4   2G  777 
 *  4   松原  4   904 1   5   3G  666 
 */
public class MapSideJoinMain extends Configured implements Tool{   
    private static final Logger logger = LoggerFactory.getLogger(MapSideJoinMain.class);   
    public static class LeftOutJoinMapper extends Mapper<Object, Text, Text, Text> {

        private HashMap<String,String> city_info = new HashMap<String, String>();   
        private Text outPutKey = new Text();   
        private Text outPutValue = new Text();   
        private String mapInputStr = null;   
        private String mapInputSpit[] = null;   
        private String city_secondPart = null;   
        /** 
         * 此方法在每个task开始之前执行,这里主要用作从DistributedCache 
         * 中取到tb_dim_city文件,并将里边记录取出放到内存中。 
         */
        @Override
        protected void setup(Context context)   
                throws IOException, InterruptedException {   
            BufferedReader br = null;   
            //获得当前作业的DistributedCache相关文件 
            Path[] distributePaths = DistributedCache.getLocalCacheFiles(context.getConfiguration());   
            String cityInfo = null;   
            for(Path p : distributePaths){   
                if(p.toString().endsWith("tb_dim_city.dat")){   
                    //读缓存文件,并放到mem中 
                    br = new BufferedReader(new FileReader(p.toString()));   
                    while(null!=(cityInfo=br.readLine())){   
                        String[] cityPart = cityInfo.split("\\|",5);   
                        if(cityPart.length ==5){   
                            city_info.put(cityPart[0], cityPart[1]+"\t"+cityPart[2]+"\t"+cityPart[3]+"\t"+cityPart[4]);   
                        }   
                    }   
                }   
            }   
        }

        /** 
         * Map端的实现相当简单,直接判断tb_user_profiles.dat中的 
         * cityID是否存在我的map中就ok了,这样就可以实现Map Join了 
         */
        @Override
        protected void map(Object key, Text value, Context context)   
                throws IOException, InterruptedException {   
            //排掉空行 
            if(value == null || value.toString().equals("")){   
                return;   
            }   
            mapInputStr = value.toString();   
            mapInputSpit = mapInputStr.split("\\|",4);   
            //过滤非法记录 
            if(mapInputSpit.length != 4){   
                return;   
            }   
            //判断链接字段是否在map中存在 
            city_secondPart = city_info.get(mapInputSpit[3]);   
            if(city_secondPart != null){   
                this.outPutKey.set(mapInputSpit[3]);   
                this.outPutValue.set(city_secondPart+"\t"+mapInputSpit[0]+"\t"+mapInputSpit[1]+"\t"+mapInputSpit[2]);   
                context.write(outPutKey, outPutValue);   
            }   
        }   
    }   
    @Override
    public int run(String[] args) throws Exception {   
            Configuration conf=getConf(); //获得配置文件对象 
            DistributedCache.addCacheFile(new Path(args[1]).toUri(), conf);//为该job添加缓存文件 
            Job job=new Job(conf,"MapJoinMR");   
            job.setNumReduceTasks(0);

            FileInputFormat.addInputPath(job, new Path(args[0])); //设置map输入文件路径 
            FileOutputFormat.setOutputPath(job, new Path(args[2])); //设置reduce输出文件路径

            job.setJarByClass(MapSideJoinMain.class);   
            job.setMapperClass(LeftOutJoinMapper.class);

            job.setInputFormatClass(TextInputFormat.class); //设置文件输入格式 
            job.setOutputFormatClass(TextOutputFormat.class);//使用默认的output格式

            //设置map的输出key和value类型 
            job.setMapOutputKeyClass(Text.class);

            //设置reduce的输出key和value类型 
            job.setOutputKeyClass(Text.class);   
            job.setOutputValueClass(Text.class);   
            job.waitForCompletion(true);   
            return job.isSuccessful()?0:1;   
    }   
    public static void main(String[] args) throws IOException,   
            ClassNotFoundException, InterruptedException {   
        try {   
            int returnCode =  ToolRunner.run(new MapSideJoinMain(),args);   
            System.exit(returnCode);   
        } catch (Exception e) {   
            // TODO Auto-generated catch block 
            logger.error(e.getMessage());   
        }   
    }   
} 
package com.mr.mapSideJoin;   
import java.io.BufferedReader;   
import java.io.FileReader;   
import java.io.IOException;   
import java.util.HashMap;   
import org.apache.hadoop.conf.Configuration;   
import org.apache.hadoop.conf.Configured;   
import org.apache.hadoop.filecache.DistributedCache;   
import org.apache.hadoop.fs.Path;   
import org.apache.hadoop.io.Text;   
import org.apache.hadoop.mapreduce.Job;   
import org.apache.hadoop.mapreduce.Mapper;   
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;   
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;   
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;   
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;   
import org.apache.hadoop.util.Tool;   
import org.apache.hadoop.util.ToolRunner;   
import org.slf4j.Logger;   
import org.slf4j.LoggerFactory;   
/** 
 * @author zengzhaozheng 
 * 
 * 用途说明: 
 * Map side join中的left outer join 
 * 左连接,两个文件分别代表2个表,连接字段table1的id字段和table2的cityID字段 
 * table1(左表):tb_dim_city(id int,name string,orderid int,city_code,is_show), 
 * 假设tb_dim_city文件记录数很少,tb_dim_city.dat文件内容,分隔符为"|": 
 * id     name  orderid  city_code  is_show 
 * 0       其他        9999     9999         0 
 * 1       长春        1        901          1 
 * 2       吉林        2        902          1 
 * 3       四平        3        903          1 
 * 4       松原        4        904          1 
 * 5       通化        5        905          1 
 * 6       辽源        6        906          1 
 * 7       白城        7        907          1 
 * 8       白山        8        908          1 
 * 9       延吉        9        909          1 
 * -------------------------风骚的分割线------------------------------- 
 * table2(右表):tb_user_profiles(userID int,userName string,network string,double flow,cityID int) 
 * tb_user_profiles.dat文件内容,分隔符为"|": 
 * userID   network     flow    cityID 
 * 1           2G       123      1 
 * 2           3G       333      2 
 * 3           3G       555      1 
 * 4           2G       777      3 
 * 5           3G       666      4 
 * -------------------------风骚的分割线------------------------------- 
 *  结果: 
 *  1   长春  1   901 1   1   2G  123 
 *  1   长春  1   901 1   3   3G  555 
 *  2   吉林  2   902 1   2   3G  333 
 *  3   四平  3   903 1   4   2G  777 
 *  4   松原  4   904 1   5   3G  666 
 */
public class MapSideJoinMain extends Configured implements Tool{   
    private static final Logger logger = LoggerFactory.getLogger(MapSideJoinMain.class);   
    public static class LeftOutJoinMapper extends Mapper<Object, Text, Text, Text> {

        private HashMap<String,String> city_info = new HashMap<String, String>();   
        private Text outPutKey = new Text();   
        private Text outPutValue = new Text();   
        private String mapInputStr = null;   
        private String mapInputSpit[] = null;   
        private String city_secondPart = null;   
        /** 
         * 此方法在每个task开始之前执行,这里主要用作从DistributedCache 
         * 中取到tb_dim_city文件,并将里边记录取出放到内存中。 
         */
        @Override
        protected void setup(Context context)   
                throws IOException, InterruptedException {   
            BufferedReader br = null;   
            //获得当前作业的DistributedCache相关文件 
            Path[] distributePaths = DistributedCache.getLocalCacheFiles(context.getConfiguration());   
            String cityInfo = null;   
            for(Path p : distributePaths){   
                if(p.toString().endsWith("tb_dim_city.dat")){   
                    //读缓存文件,并放到mem中 
                    br = new BufferedReader(new FileReader(p.toString()));   
                    while(null!=(cityInfo=br.readLine())){   
                        String[] cityPart = cityInfo.split("\\|",5);   
                        if(cityPart.length ==5){   
                            city_info.put(cityPart[0], cityPart[1]+"\t"+cityPart[2]+"\t"+cityPart[3]+"\t"+cityPart[4]);   
                        }   
                    }   
                }   
            }   
        }

        /** 
         * Map端的实现相当简单,直接判断tb_user_profiles.dat中的 
         * cityID是否存在我的map中就ok了,这样就可以实现Map Join了 
         */
        @Override
        protected void map(Object key, Text value, Context context)   
                throws IOException, InterruptedException {   
            //排掉空行 
            if(value == null || value.toString().equals("")){   
                return;   
            }   
            mapInputStr = value.toString();   
            mapInputSpit = mapInputStr.split("\\|",4);   
            //过滤非法记录 
            if(mapInputSpit.length != 4){   
                return;   
            }   
            //判断链接字段是否在map中存在 
            city_secondPart = city_info.get(mapInputSpit[3]);   
            if(city_secondPart != null){   
                this.outPutKey.set(mapInputSpit[3]);   
                this.outPutValue.set(city_secondPart+"\t"+mapInputSpit[0]+"\t"+mapInputSpit[1]+"\t"+mapInputSpit[2]);   
                context.write(outPutKey, outPutValue);   
            }   
        }   
    }   
    @Override
    public int run(String[] args) throws Exception {   
            Configuration conf=getConf(); //获得配置文件对象 
            DistributedCache.addCacheFile(new Path(args[1]).toUri(), conf);//为该job添加缓存文件 
            Job job=new Job(conf,"MapJoinMR");   
            job.setNumReduceTasks(0);

            FileInputFormat.addInputPath(job, new Path(args[0])); //设置map输入文件路径 
            FileOutputFormat.setOutputPath(job, new Path(args[2])); //设置reduce输出文件路径

            job.setJarByClass(MapSideJoinMain.class);   
            job.setMapperClass(LeftOutJoinMapper.class);

            job.setInputFormatClass(TextInputFormat.class); //设置文件输入格式 
            job.setOutputFormatClass(TextOutputFormat.class);//使用默认的output格式

            //设置map的输出key和value类型 
            job.setMapOutputKeyClass(Text.class);

            //设置reduce的输出key和value类型 
            job.setOutputKeyClass(Text.class);   
            job.setOutputValueClass(Text.class);   
            job.waitForCompletion(true);   
            return job.isSuccessful()?0:1;   
    }   
    public static void main(String[] args) throws IOException,   
            ClassNotFoundException, InterruptedException {   
        try {   
            int returnCode =  ToolRunner.run(new MapSideJoinMain(),args);   
            System.exit(returnCode);   
        } catch (Exception e) {   
            // TODO Auto-generated catch block 
            logger.error(e.getMessage());   
        }   
    }   
} 

  

  

那里说说DistributedCache。DistributedCache是分布式缓存的1种完毕,它在全方位MapReduce框架中起着一定主要的功能,他能够帮助大家写一些一定复杂高效的分布式程序。说回来那里,JobTracker在学业运行以前会获取到DistributedCache的能源uri列表,并将相应的文本分发到各类涉及到该学业的任务的TaskTracker上。其它,关于DistributedCache和课业的涉及,比如权限、存款和储蓄路径区分、public和private等质量,接下去有用再整理探讨一下写一篇blog,那里就不详细说了。

此间说说DistributedCache。DistributedCache是分布式缓存的1种完结,它在任何MapReduce框架中起着相当关键的效劳,他得以支撑大家写1些一定复杂高效的分布式程序。说回来那里,JobTracker在作业运转此前会赢得到DistributedCache的财富uri列表,并将相应的文书分发到各样涉及到该学业的任务的TaskTracker上。其它,关于DistributedCache和学业的涉嫌,比如权限、存款和储蓄路径区分、public和private等属性,接下去有用再整治切磋一下写一篇blog,那里就不详细说了。

除此以外还有1种相比变态的Map Join格局,就是整合HBase来做Map
Join操作。那种方法完全可以突破内部存款和储蓄器的决定,使您毫无忌惮的运用Map
Join,而且作用也要命科学。

除此以外还有一种相比较变态的Map Join方式,就是构成HBase来做Map
Join操作。这种格局完全能够突破内部存款和储蓄器的支配,使您毫无忌惮的采用Map
Join,而且功效也分外科学。

二、SemiJoin
SemiJoin正是所谓的半连接,其实仔细1看便是reduce
join的二个变种,正是在map端过滤掉壹部分数额,在网络中只传输加入连接的数额不到场连接的数额不必在网络中展开传输,从而收缩了shuffle的互联网传输量,使全部功效得到增强,别的思想和reduce
join是1模1样的。说得越发接地气一点正是将小表中插手join的key单独抽出来通过DistributedCach分发到有关节点,然后将其取出放到内部存储器中(能够停放HashSet中),在map阶段扫描连接表,将join
key不在内部存款和储蓄器HashSet中的记录过滤掉,让那二个插足join的笔录通过shuffle传输到reduce端进行join操作,别的的和reduce
join都是同1的。

二、SemiJoin
SemiJoin正是所谓的半连接,其实仔细一看正是reduce
join的三个变种,正是在map端过滤掉一部分多少,在互连网中只传输参与连接的数量不参预连接的数目不必在网络中举行传输,从而裁减了shuffle的互联网传输量,使全体效能获得提升,其余思想和reduce
join是一模一样的。说得更其接地气一点就是将小表中到场join的key单独抽出来通过DistributedCach分发到有关节点,然后将其取出放到内存中(能够放置HashSet中),在map阶段扫描连接表,将join
key不在内部存储器HashSet中的记录过滤掉,让这一个参预join的记录通过shuffle传输到reduce端进行join操作,别的的和reduce
join都是一样的。

看代码:

看代码:

package com.mr.SemiJoin;   
import java.io.BufferedReader;   
import java.io.FileReader;   
import java.io.IOException;   
import java.util.ArrayList;   
import java.util.HashSet;   
import org.apache.hadoop.conf.Configuration;   
import org.apache.hadoop.conf.Configured;   
import org.apache.hadoop.filecache.DistributedCache;   
import org.apache.hadoop.fs.Path;   
import org.apache.hadoop.io.Text;   
import org.apache.hadoop.mapreduce.Job;   
import org.apache.hadoop.mapreduce.Mapper;   
import org.apache.hadoop.mapreduce.Reducer;   
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;   
import org.apache.hadoop.mapreduce.lib.input.FileSplit;   
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;   
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;   
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;   
import org.apache.hadoop.util.Tool;   
import org.apache.hadoop.util.ToolRunner;   
import org.slf4j.Logger;   
import org.slf4j.LoggerFactory;   
/** 
 * @author zengzhaozheng 
 * 
 * 用途说明: 
 * reudce side join中的left outer join 
 * 左连接,两个文件分别代表2个表,连接字段table1的id字段和table2的cityID字段 
 * table1(左表):tb_dim_city(id int,name string,orderid int,city_code,is_show) 
 * tb_dim_city.dat文件内容,分隔符为"|": 
 * id     name  orderid  city_code  is_show 
 * 0       其他        9999     9999         0 
 * 1       长春        1        901          1 
 * 2       吉林        2        902          1 
 * 3       四平        3        903          1 
 * 4       松原        4        904          1 
 * 5       通化        5        905          1 
 * 6       辽源        6        906          1 
 * 7       白城        7        907          1 
 * 8       白山        8        908          1 
 * 9       延吉        9        909          1 
 * -------------------------风骚的分割线------------------------------- 
 * table2(右表):tb_user_profiles(userID int,userName string,network string,double flow,cityID int) 
 * tb_user_profiles.dat文件内容,分隔符为"|": 
 * userID   network     flow    cityID 
 * 1           2G       123      1 
 * 2           3G       333      2 
 * 3           3G       555      1 
 * 4           2G       777      3 
 * 5           3G       666      4 
 * -------------------------风骚的分割线------------------------------- 
 * joinKey.dat内容: 
 * city_code 
 * 1 
 * 2 
 * 3 
 * 4 
 * -------------------------风骚的分割线------------------------------- 
 *  结果: 
 *  1   长春  1   901 1   1   2G  123 
 *  1   长春  1   901 1   3   3G  555 
 *  2   吉林  2   902 1   2   3G  333 
 *  3   四平  3   903 1   4   2G  777 
 *  4   松原  4   904 1   5   3G  666 
 */
public class SemiJoin extends Configured implements Tool{   
    private static final Logger logger = LoggerFactory.getLogger(SemiJoin.class);   
    public static class SemiJoinMapper extends Mapper<Object, Text, Text, CombineValues> {   
        private CombineValues combineValues = new CombineValues();   
        private HashSet<String> joinKeySet = new HashSet<String>();   
        private Text flag = new Text();   
        private Text joinKey = new Text();   
        private Text secondPart = new Text();   
        /** 
         * 将参加join的key从DistributedCache取出放到内存中,以便在map端将要参加join的key过滤出来。b 
         */
        @Override
        protected void setup(Context context)   
                throws IOException, InterruptedException {   
            BufferedReader br = null;   
            //获得当前作业的DistributedCache相关文件 
            Path[] distributePaths = DistributedCache.getLocalCacheFiles(context.getConfiguration());   
            String joinKeyStr = null;   
            for(Path p : distributePaths){   
                if(p.toString().endsWith("joinKey.dat")){   
                    //读缓存文件,并放到mem中 
                    br = new BufferedReader(new FileReader(p.toString()));   
                    while(null!=(joinKeyStr=br.readLine())){   
                        joinKeySet.add(joinKeyStr);   
                    }   
                }   
            }   
        }   
        @Override
        protected void map(Object key, Text value, Context context)   
                throws IOException, InterruptedException {   
            //获得文件输入路径 
            String pathName = ((FileSplit) context.getInputSplit()).getPath().toString();   
            //数据来自tb_dim_city.dat文件,标志即为"0" 
            if(pathName.endsWith("tb_dim_city.dat")){   
                String[] valueItems = value.toString().split("\\|");   
                //过滤格式错误的记录 
                if(valueItems.length != 5){   
                    return;   
                }   
                //过滤掉不需要参加join的记录 
                if(joinKeySet.contains(valueItems[0])){   
                    flag.set("0");   
                    joinKey.set(valueItems[0]);   
                    secondPart.set(valueItems[1]+"\t"+valueItems[2]+"\t"+valueItems[3]+"\t"+valueItems[4]);   
                    combineValues.setFlag(flag);   
                    combineValues.setJoinKey(joinKey);   
                    combineValues.setSecondPart(secondPart);   
                    context.write(combineValues.getJoinKey(), combineValues);   
                }else{   
                    return ;   
                }   
            }//数据来自于tb_user_profiles.dat,标志即为"1" 
            else if(pathName.endsWith("tb_user_profiles.dat")){   
                String[] valueItems = value.toString().split("\\|");   
                //过滤格式错误的记录 
                if(valueItems.length != 4){   
                    return;   
                }   
                //过滤掉不需要参加join的记录 
                if(joinKeySet.contains(valueItems[3])){   
                    flag.set("1");   
                    joinKey.set(valueItems[3]);   
                    secondPart.set(valueItems[0]+"\t"+valueItems[1]+"\t"+valueItems[2]);   
                    combineValues.setFlag(flag);   
                    combineValues.setJoinKey(joinKey);   
                    combineValues.setSecondPart(secondPart);   
                    context.write(combineValues.getJoinKey(), combineValues);   
                }else{   
                    return ;   
                }   
            }   
        }   
    }   
    public static class SemiJoinReducer extends Reducer<Text, CombineValues, Text, Text> {   
        //存储一个分组中的左表信息 
        private ArrayList<Text> leftTable = new ArrayList<Text>();   
        //存储一个分组中的右表信息 
        private ArrayList<Text> rightTable = new ArrayList<Text>();   
        private Text secondPar = null;   
        private Text output = new Text();   
        /** 
         * 一个分组调用一次reduce函数 
         */
        @Override
        protected void reduce(Text key, Iterable<CombineValues> value, Context context)   
                throws IOException, InterruptedException {   
            leftTable.clear();   
            rightTable.clear();   
            /** 
             * 将分组中的元素按照文件分别进行存放 
             * 这种方法要注意的问题: 
             * 如果一个分组内的元素太多的话,可能会导致在reduce阶段出现OOM, 
             * 在处理分布式问题之前最好先了解数据的分布情况,根据不同的分布采取最 
             * 适当的处理方法,这样可以有效的防止导致OOM和数据过度倾斜问题。 
             */
            for(CombineValues cv : value){   
                secondPar = new Text(cv.getSecondPart().toString());   
                //左表tb_dim_city 
                if("0".equals(cv.getFlag().toString().trim())){   
                    leftTable.add(secondPar);   
                }   
                //右表tb_user_profiles 
                else if("1".equals(cv.getFlag().toString().trim())){   
                    rightTable.add(secondPar);   
                }   
            }   
            logger.info("tb_dim_city:"+leftTable.toString());   
            logger.info("tb_user_profiles:"+rightTable.toString());   
            for(Text leftPart : leftTable){   
                for(Text rightPart : rightTable){   
                    output.set(leftPart+ "\t" + rightPart);   
                    context.write(key, output);   
                }   
            }   
        }   
    }   
    @Override
    public int run(String[] args) throws Exception {   
            Configuration conf=getConf(); //获得配置文件对象 
            DistributedCache.addCacheFile(new Path(args[2]).toUri(), conf);
            Job job=new Job(conf,"LeftOutJoinMR");   
            job.setJarByClass(SemiJoin.class);

            FileInputFormat.addInputPath(job, new Path(args[0])); //设置map输入文件路径 
            FileOutputFormat.setOutputPath(job, new Path(args[1])); //设置reduce输出文件路径

            job.setMapperClass(SemiJoinMapper.class);   
            job.setReducerClass(SemiJoinReducer.class);

            job.setInputFormatClass(TextInputFormat.class); //设置文件输入格式 
            job.setOutputFormatClass(TextOutputFormat.class);//使用默认的output格式

            //设置map的输出key和value类型 
            job.setMapOutputKeyClass(Text.class);   
            job.setMapOutputValueClass(CombineValues.class);

            //设置reduce的输出key和value类型 
            job.setOutputKeyClass(Text.class);   
            job.setOutputValueClass(Text.class);   
            job.waitForCompletion(true);   
            return job.isSuccessful()?0:1;   
    }   
    public static void main(String[] args) throws IOException,   
            ClassNotFoundException, InterruptedException {   
        try {   
            int returnCode =  ToolRunner.run(new SemiJoin(),args);   
            System.exit(returnCode);   
        } catch (Exception e) {   
            logger.error(e.getMessage());   
        }   
    }   
} 
package com.mr.SemiJoin;   
import java.io.BufferedReader;   
import java.io.FileReader;   
import java.io.IOException;   
import java.util.ArrayList;   
import java.util.HashSet;   
import org.apache.hadoop.conf.Configuration;   
import org.apache.hadoop.conf.Configured;   
import org.apache.hadoop.filecache.DistributedCache;   
import org.apache.hadoop.fs.Path;   
import org.apache.hadoop.io.Text;   
import org.apache.hadoop.mapreduce.Job;   
import org.apache.hadoop.mapreduce.Mapper;   
import org.apache.hadoop.mapreduce.Reducer;   
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;   
import org.apache.hadoop.mapreduce.lib.input.FileSplit;   
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;   
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;   
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;   
import org.apache.hadoop.util.Tool;   
import org.apache.hadoop.util.ToolRunner;   
import org.slf4j.Logger;   
import org.slf4j.LoggerFactory;   
/** 
 * @author zengzhaozheng 
 * 
 * 用途说明: 
 * reudce side join中的left outer join 
 * 左连接,两个文件分别代表2个表,连接字段table1的id字段和table2的cityID字段 
 * table1(左表):tb_dim_city(id int,name string,orderid int,city_code,is_show) 
 * tb_dim_city.dat文件内容,分隔符为"|": 
 * id     name  orderid  city_code  is_show 
 * 0       其他        9999     9999         0 
 * 1       长春        1        901          1 
 * 2       吉林        2        902          1 
 * 3       四平        3        903          1 
 * 4       松原        4        904          1 
 * 5       通化        5        905          1 
 * 6       辽源        6        906          1 
 * 7       白城        7        907          1 
 * 8       白山        8        908          1 
 * 9       延吉        9        909          1 
 * -------------------------风骚的分割线------------------------------- 
 * table2(右表):tb_user_profiles(userID int,userName string,network string,double flow,cityID int) 
 * tb_user_profiles.dat文件内容,分隔符为"|": 
 * userID   network     flow    cityID 
 * 1           2G       123      1 
 * 2           3G       333      2 
 * 3           3G       555      1 
 * 4           2G       777      3 
 * 5           3G       666      4 
 * -------------------------风骚的分割线------------------------------- 
 * joinKey.dat内容: 
 * city_code 
 * 1 
 * 2 
 * 3 
 * 4 
 * -------------------------风骚的分割线------------------------------- 
 *  结果: 
 *  1   长春  1   901 1   1   2G  123 
 *  1   长春  1   901 1   3   3G  555 
 *  2   吉林  2   902 1   2   3G  333 
 *  3   四平  3   903 1   4   2G  777 
 *  4   松原  4   904 1   5   3G  666 
 */
public class SemiJoin extends Configured implements Tool{   
    private static final Logger logger = LoggerFactory.getLogger(SemiJoin.class);   
    public static class SemiJoinMapper extends Mapper<Object, Text, Text, CombineValues> {   
        private CombineValues combineValues = new CombineValues();   
        private HashSet<String> joinKeySet = new HashSet<String>();   
        private Text flag = new Text();   
        private Text joinKey = new Text();   
        private Text secondPart = new Text();   
        /** 
         * 将参加join的key从DistributedCache取出放到内存中,以便在map端将要参加join的key过滤出来。b 
         */
        @Override
        protected void setup(Context context)   
                throws IOException, InterruptedException {   
            BufferedReader br = null;   
            //获得当前作业的DistributedCache相关文件 
            Path[] distributePaths = DistributedCache.getLocalCacheFiles(context.getConfiguration());   
            String joinKeyStr = null;   
            for(Path p : distributePaths){   
                if(p.toString().endsWith("joinKey.dat")){   
                    //读缓存文件,并放到mem中 
                    br = new BufferedReader(new FileReader(p.toString()));   
                    while(null!=(joinKeyStr=br.readLine())){   
                        joinKeySet.add(joinKeyStr);   
                    }   
                }   
            }   
        }   
        @Override
        protected void map(Object key, Text value, Context context)   
                throws IOException, InterruptedException {   
            //获得文件输入路径 
            String pathName = ((FileSplit) context.getInputSplit()).getPath().toString();   
            //数据来自tb_dim_city.dat文件,标志即为"0" 
            if(pathName.endsWith("tb_dim_city.dat")){   
                String[] valueItems = value.toString().split("\\|");   
                //过滤格式错误的记录 
                if(valueItems.length != 5){   
                    return;   
                }   
                //过滤掉不需要参加join的记录 
                if(joinKeySet.contains(valueItems[0])){   
                    flag.set("0");   
                    joinKey.set(valueItems[0]);   
                    secondPart.set(valueItems[1]+"\t"+valueItems[2]+"\t"+valueItems[3]+"\t"+valueItems[4]);   
                    combineValues.setFlag(flag);   
                    combineValues.setJoinKey(joinKey);   
                    combineValues.setSecondPart(secondPart);   
                    context.write(combineValues.getJoinKey(), combineValues);   
                }else{   
                    return ;   
                }   
            }//数据来自于tb_user_profiles.dat,标志即为"1" 
            else if(pathName.endsWith("tb_user_profiles.dat")){   
                String[] valueItems = value.toString().split("\\|");   
                //过滤格式错误的记录 
                if(valueItems.length != 4){   
                    return;   
                }   
                //过滤掉不需要参加join的记录 
                if(joinKeySet.contains(valueItems[3])){   
                    flag.set("1");   
                    joinKey.set(valueItems[3]);   
                    secondPart.set(valueItems[0]+"\t"+valueItems[1]+"\t"+valueItems[2]);   
                    combineValues.setFlag(flag);   
                    combineValues.setJoinKey(joinKey);   
                    combineValues.setSecondPart(secondPart);   
                    context.write(combineValues.getJoinKey(), combineValues);   
                }else{   
                    return ;   
                }   
            }   
        }   
    }   
    public static class SemiJoinReducer extends Reducer<Text, CombineValues, Text, Text> {   
        //存储一个分组中的左表信息 
        private ArrayList<Text> leftTable = new ArrayList<Text>();   
        //存储一个分组中的右表信息 
        private ArrayList<Text> rightTable = new ArrayList<Text>();   
        private Text secondPar = null;   
        private Text output = new Text();   
        /** 
         * 一个分组调用一次reduce函数 
         */
        @Override
        protected void reduce(Text key, Iterable<CombineValues> value, Context context)   
                throws IOException, InterruptedException {   
            leftTable.clear();   
            rightTable.clear();   
            /** 
             * 将分组中的元素按照文件分别进行存放 
             * 这种方法要注意的问题: 
             * 如果一个分组内的元素太多的话,可能会导致在reduce阶段出现OOM, 
             * 在处理分布式问题之前最好先了解数据的分布情况,根据不同的分布采取最 
             * 适当的处理方法,这样可以有效的防止导致OOM和数据过度倾斜问题。 
             */
            for(CombineValues cv : value){   
                secondPar = new Text(cv.getSecondPart().toString());   
                //左表tb_dim_city 
                if("0".equals(cv.getFlag().toString().trim())){   
                    leftTable.add(secondPar);   
                }   
                //右表tb_user_profiles 
                else if("1".equals(cv.getFlag().toString().trim())){   
                    rightTable.add(secondPar);   
                }   
            }   
            logger.info("tb_dim_city:"+leftTable.toString());   
            logger.info("tb_user_profiles:"+rightTable.toString());   
            for(Text leftPart : leftTable){   
                for(Text rightPart : rightTable){   
                    output.set(leftPart+ "\t" + rightPart);   
                    context.write(key, output);   
                }   
            }   
        }   
    }   
    @Override
    public int run(String[] args) throws Exception {   
            Configuration conf=getConf(); //获得配置文件对象 
            DistributedCache.addCacheFile(new Path(args[2]).toUri(), conf);
            Job job=new Job(conf,"LeftOutJoinMR");   
            job.setJarByClass(SemiJoin.class);

            FileInputFormat.addInputPath(job, new Path(args[0])); //设置map输入文件路径 
            FileOutputFormat.setOutputPath(job, new Path(args[1])); //设置reduce输出文件路径

            job.setMapperClass(SemiJoinMapper.class);   
            job.setReducerClass(SemiJoinReducer.class);

            job.setInputFormatClass(TextInputFormat.class); //设置文件输入格式 
            job.setOutputFormatClass(TextOutputFormat.class);//使用默认的output格式

            //设置map的输出key和value类型 
            job.setMapOutputKeyClass(Text.class);   
            job.setMapOutputValueClass(CombineValues.class);

            //设置reduce的输出key和value类型 
            job.setOutputKeyClass(Text.class);   
            job.setOutputValueClass(Text.class);   
            job.waitForCompletion(true);   
            return job.isSuccessful()?0:1;   
    }   
    public static void main(String[] args) throws IOException,   
            ClassNotFoundException, InterruptedException {   
        try {   
            int returnCode =  ToolRunner.run(new SemiJoin(),args);   
            System.exit(returnCode);   
        } catch (Exception e) {   
            logger.error(e.getMessage());   
        }   
    }   
} 

  

  

那边还说说SemiJoin也是有肯定的适用范围的,其抽取出来实行join的key是要放置内部存款和储蓄器中的,所以不可见太大,简单在Map端造成OOM。

此处还说说SemiJoin也是有早晚的适用范围的,其抽取出来进行join的key是要放置内部存款和储蓄器中的,所以不可能太大,简单在Map端造成OOM。

三、总结
blog介绍了三种join格局。那二种join方式适用于区别的气象,其处理功效上的离开如故蛮大的,当中主要导致因素是网络传输。Map
join效能最高,其次是SemiJoin,最低的是reduce
join。别的,写分布式大数量处理程序的时最佳要对完全要处理的数据分布景况作一个摸底,那可以增强咱们代码的功能,使数据的倾斜度降到最低,使大家的代码倾向性越来越好。

三、总结
blog介绍了二种join格局。那二种join格局适用于分歧的现象,其拍卖效能上的偏离依旧蛮大的,在那之中重点导致因素是网络传输。Map
join作用最高,其次是SemiJoin,最低的是reduce
join。此外,写分布式大数目处理程序的时最棒要对总体要拍卖的数据分布情况作二个叩问,那足以进步我们代码的频率,使数码的倾斜度降到最低,使大家的代码倾向性更加好。

本文写作进程中参阅了新加坡尚学堂连带技术小说,在此多谢上海尚学堂先生的辅助。

正文写作过程中参考了新加坡尚学堂有关技能小说,在此谢谢巴黎尚学堂先生的扶助。