博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
mapreduce (五) MapReduce实现倒排索引 修改版 combiner是把同一个机器上的多个map的结果先聚合一次...
阅读量:6328 次
发布时间:2019-06-22

本文共 6874 字,大约阅读时间需要 22 分钟。

(总感觉上一篇的实现有问题)http://www.cnblogs.com/i80386/p/3444726.html   combiner是把同一个机器上的多个map的结果先聚合一次 现重新实现一个:
思路: 第一个mapreduce仅仅做  
的统计,即某个单词在某一篇文章里出现的次数。(原理跟wordcount一样,只是word变成了word_docid) 第二个mapreduce将word_docid在map阶段拆开,重新组合为
然后在combine和reduce阶段(combine和reduce是同一个函数)组合为
这种格式import java.io.IOException;
1 思路: 0.txt MapReduce is simple 1.txt MapReduce is powerfull is simple 2.txt Hello MapReduce bye MapReduce 采用两个JOB的形式实现 一:第一个JOB(跟wordcount一致,只是wordcount中的word换做了word:dicid) 1 map函数:context.write(word:docid, 1) 即将word:docid作为map函数的输出 输出key        输出value MapReduce:0.txt 1 is:0.txt 1 simple:0.txt 1 Mapreduce:1.txt 1 is:1.txt 1 powerfull:1.txt 1 is:1.txt 1 simple:1.txt 1 Hello:2.txt 1 MapReduce:2.txt 1 bye:2.txt 1 MapReduce:2.txt 1 2 Partitioner函数:HashPartitioner 略,根据map函数的输出key(word:docid)进行分区 3 reduce函数:累加输入values 输出key    输出value MapReduce:0.txt 1 => MapReduce 0.txt:1 is:0.txt 1        => is 0.txt:1 simple:0.txt 1    => simple 0.txt:1 Mapreduce:1.txt 1 => Mapreduce 1.txt:1 is:1.txt 2        => is 1.txt:2 powerfull:1.txt 1 => powerfull 1.txt:1 simple:1.txt 1    => simple 1.txt:1 Hello:2.txt 1     => Hello 2.txt:1 MapReduce:2.txt 2 => MapReduce 2.txt:2 bye:2.txt 1       => bye 2.txt:1 二:第二个JOB 1 map函数: 输入key    输入value  输出key    输出value MapReduce:0.txt 1 => MapReduce 0.txt:1 is:0.txt 1        => is 0.txt:1 simple:0.txt 1    => simple 0.txt:1 Mapreduce:1.txt 1 => Mapreduce 1.txt:1 is:1.txt 2        => is 1.txt:2 powerfull:1.txt 1 => powerfull 1.txt:1 simple:1.txt 1    => simple 1.txt:1 Hello:2.txt 1     => Hello 2.txt:1 MapReduce:2.txt 2 => MapReduce 2 2 reduce函数 (组合values) 输出key    输出value MapReduce 0.txt:1,1.txt:1 2.txt:2 is 0.txt:1,is 1.txt:2 simple 0.txt:1,1.txt:1 powerfull 1.txt:1 Hello 2.txt:1 bye 2.txt:1
import java.util.Random;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.FileSplit;import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;import org.apache.hadoop.mapreduce.lib.reduce.IntSumReducer;public class MyInvertIndex {    public static class SplitMapper extends            Mapper
{ public void map(Object key, Text value, Context context) throws IOException, InterruptedException { FileSplit split = (FileSplit) context.getInputSplit(); //String pth = split.getPath().toString(); String name = split.getPath().getName(); String[] tokens = value.toString().split("\\s"); for (String token : tokens) { context.write(new Text(token + ":" + name), new IntWritable(1)); } } } public static class CombineMapper extends Mapper
{ public void map(Text key, IntWritable value, Context context) throws IOException, InterruptedException { int splitIndex = key.toString().indexOf(":"); context.write(new Text(key.toString().substring(0, splitIndex)), new Text(key.toString().substring(splitIndex + 1) + ":" + value.toString())); } } public static class CombineReducer extends Reducer
{ public void reduce(Text key, Iterable
values, Context context) throws IOException, InterruptedException { StringBuffer buff = new StringBuffer(); for (Text val : values) { buff.append(val.toString() + ","); } context.write(key, new Text(buff.toString())); } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { String dir_in = "hdfs://localhost:9000/in_invertedindex"; String dir_out = "hdfs://localhost:9000/out_invertedindex"; Path in = new Path(dir_in); Path out = new Path(dir_out); Path path_tmp = new Path("word_docid" + Integer.toString(new Random().nextInt(Integer.MAX_VALUE))); Configuration conf = new Configuration(); try { Job countJob = new Job(conf, "invertedindex_count"); countJob.setJarByClass(MyInvertIndex.class); countJob.setInputFormatClass(TextInputFormat.class); countJob.setMapperClass(SplitMapper.class); countJob.setCombinerClass(IntSumReducer.class); countJob.setPartitionerClass(HashPartitioner.class); countJob.setMapOutputKeyClass(Text.class); countJob.setMapOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(countJob, in); countJob.setReducerClass(IntSumReducer.class); // countJob.setNumReduceTasks(1); countJob.setOutputKeyClass(Text.class); countJob.setOutputValueClass(IntWritable.class); countJob.setOutputFormatClass(SequenceFileOutputFormat.class); FileOutputFormat.setOutputPath(countJob, path_tmp); countJob.waitForCompletion(true); Job combineJob = new Job(conf, "invertedindex_combine"); combineJob.setJarByClass(MyInvertIndex.class); combineJob.setInputFormatClass(SequenceFileInputFormat.class); combineJob.setMapperClass(CombineMapper.class); combineJob.setCombinerClass(CombineReducer.class); combineJob.setPartitionerClass(HashPartitioner.class); combineJob.setMapOutputKeyClass(Text.class); combineJob.setMapOutputValueClass(Text.class); FileInputFormat.addInputPath(combineJob, path_tmp); combineJob.setReducerClass(CombineReducer.class); // combineJob.setNumReduceTasks(1); combineJob.setOutputKeyClass(Text.class); combineJob.setOutputValueClass(Text.class); combineJob.setOutputFormatClass(TextOutputFormat.class); FileOutputFormat.setOutputPath(combineJob, out); combineJob.waitForCompletion(true); } finally { FileSystem.get(conf).delete(path_tmp, true); } }}
运行结果:Hello    2.txt:1,,MapReduce    2.txt:2,1.txt:1,0.txt:1,,bye    2.txt:1,,is    1.txt:2,0.txt:1,,powerfull    1.txt:1,,simple    1.txt:1,0.txt:1,,
 

 

 

 

你可能感兴趣的文章
Python获取网卡信息(名称、MAC、IP、网关等)
查看>>
Hadoop1.x和2.X的HDFS fsimage和edits文件运行机制对比
查看>>
企业信息化建设(Enterprise Informationization Construction)
查看>>
对Python-memcache分布式散列和调用的实现
查看>>
sentry日志管理系统安装以及使用教程
查看>>
思路路由器与ASA防火墙第一阶段以aggressive-mode建立***测试
查看>>
MySQL 5.6通过Keepalived+互为主从实现高可用架构
查看>>
单页面应用简介
查看>>
Word 2003中编辑标记与格式标记大讨论
查看>>
从国内向海外转移域名经验谈
查看>>
浅谈apache与tomact的整合
查看>>
OAuth与SSO、REST有哪些区别与联系
查看>>
详解XStream别名
查看>>
Exchange 2016集成ADRMS系列-1:先决条件准备
查看>>
SQL Server vNext CTP1 on Linux
查看>>
1-为 Lync Server 2010 准备 Active Directory 域服务
查看>>
统一沟通-技巧-10-Lync-公网证书-Go Daddy
查看>>
Grid Control 10G安装与配置
查看>>
云服务商到了认真考虑“自动化”的时候了
查看>>
测试应该在产品开发的哪个阶段进入?
查看>>