1、首先定义数据如下
预期结果
2、
分析:Mapper传给Reducer的格式应该是(关键字,文档名),Reducer层会自动将inputKey相同的inputValue值组成一个list,然后遍历输出就OK。
实现:原理知道了,开始实现
public class Index {
private static class IndexMapper extends Mapper<LongWritable, Text, Text, Text> {
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException { // 可以获取文件名,根据文件名来判定传入reducer的形式 String fileName = ((FileSplit) context.getInputSplit()).getPath().getName(); System.out.println(fileName); String[] strs = value.toString().trim().split(" "); for(int i=0; i<strs.length; i++){ context.write(new Text(strs[i]), new Text(fileName)); } } }private static class IndexReducer extends Reducer<Text, Text, NullWritable, Text> {
@Override
protected void reduce(Text value, Iterable<Text> datas, Reducer<Text, Text, NullWritable, Text>.Context context) throws IOException, InterruptedException { String resultStr=""; for(Text data:datas){ String[] strs = data.toString().split("[.]"); String[] res = resultStr.split(","); if(!strs[0].equals(res[res.length-1])){ resultStr+=strs[0]+","; } resultStr = resultStr.substring(0, resultStr.length()-1); context.write(NullWritable.get(), new Text(value.toString()+":"+resultStr)); } } } public static void main(String[] args) { try { Configuration cfg = HadoopCfg.getConfigration(); Job job = Job.getInstance(cfg); job.setJobName("Index"); job.setJarByClass(Index.class); job.setMapperClass(IndexMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setReducerClass(IndexReducer.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path("/input/index")); FileOutputFormat.setOutputPath(job, new Path("/index/")); System.exit(job.waitForCompletion(true) ? 0 : 1); } catch (Exception e) { e.printStackTrace(); } }}
输出就不说了,和期望的一样。大家可以试试,这个程序和逻辑都比较简单。
写在最后:每天积累一点就是离理想更近一步。不要管程序简单与否,踏踏实实扎实每一步才是硬道理。下一篇博客我将与大家分享稍微复杂的倒排索引实现。
大家下来可以自己练一下哟~~~
PS:测试数据与现在一样,预期输出为: