Grep.java
取自 GoogleSource
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
| package org.apache.hadoop.examples;
import java.util.Random; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.hadoop.mapreduce.lib.map.InverseMapper; import org.apache.hadoop.mapreduce.lib.map.RegexMapper; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner;
public class Grep extends Configured implements Tool { private Grep() {} public int run(String[] args) throws Exception { if (args.length < 3) { System.out.println("Grep <inDir> <outDir> <regex> [<group>]"); ToolRunner.printGenericCommandUsage(System.out); return 2; } Path tempDir = new Path("grep-temp-"+ Integer.toString(new Random().nextInt(Integer.MAX_VALUE))); Configuration conf = getConf(); conf.set(RegexMapper.PATTERN, args[2]); if (args.length == 4) conf.set(RegexMapper.GROUP, args[3]); Job grepJob = new Job(conf); try { grepJob.setJobName("grep-search"); FileInputFormat.setInputPaths(grepJob, args[0]); grepJob.setMapperClass(RegexMapper.class); grepJob.setCombinerClass(LongSumReducer.class); grepJob.setReducerClass(LongSumReducer.class); FileOutputFormat.setOutputPath(grepJob, tempDir); grepJob.setOutputFormatClass(SequenceFileOutputFormat.class); grepJob.setOutputKeyClass(Text.class); grepJob.setOutputValueClass(LongWritable.class); grepJob.waitForCompletion(true); Job sortJob = new Job(conf); sortJob.setJobName("grep-sort"); FileInputFormat.setInputPaths(sortJob, tempDir); sortJob.setInputFormatClass(SequenceFileInputFormat.class); sortJob.setMapperClass(InverseMapper.class); sortJob.setNumReduceTasks(1); FileOutputFormat.setOutputPath(sortJob, new Path(args[1])); sortJob.setSortComparatorClass(LongWritable.DecreasingComparator.class); sortJob.waitForCompletion(true); } finally { FileSystem.get(conf).delete(tempDir, true); } return 0; } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new Grep(), args); System.exit(res); } }
|
conf.set(RegexMapper.PATTERN, args[2]);
设置了用于匹配字符串的正则表达式
- 该程序包含两个任务
- grep-search 任务
- Mapper 类为
org.apache.hadoop.mapreduce.lib.map.RegexMapper
- Combiner 和 Reducer 类为
org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer
- 该类将每个 Mapper 作业输出的每个键的值相加并输出
- grep-sort 任务
- 实现了对所有结果按照键值降序排列
sortJob.setNumReduceTasks(1);
设置了只有一个 Reduce Task 保证了排序的全局性
RegexMapper.java
Source code recreated from a .class file by IntelliJ IDEA
(powered by Fernflower decompiler)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| package org.apache.hadoop.mapreduce.lib.map;
import java.io.IOException; import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;
@Public @Stable public class RegexMapper<K> extends Mapper<K, Text, Text, LongWritable> { public static String PATTERN = "mapreduce.mapper.regex"; public static String GROUP = "mapreduce.mapper.regexmapper.group"; private Pattern pattern; private int group;
public RegexMapper() { }
@Override public void setup(Context context) { Configuration conf = context.getConfiguration(); this.pattern = Pattern.compile(conf.get(PATTERN)); this.group = conf.getInt(GROUP, 0); }
@Override public void map(K key, Text value, Context context) throws IOException, InterruptedException { String text = value.toString(); Matcher matcher = this.pattern.matcher(text);
while(matcher.find()) { context.write(new Text(matcher.group(this.group)), new LongWritable(1L)); }
} }
|
- 该类继承自
org.apache.hadoop.mapreduce.Mapper
类并重写了 setup 和 map 方法
- setup 方法中,将 PATTERN 设置到 pattern 对象,并将 GROUP 设置到 group 对象。PATTERN 和 GROUP 在主类中已设置,对 Java 正则表达式用法不明确可参考 W3Cschool
- map 方法中,每次匹配到正则表达式的字符串就将其输出
InverseMapper.java
Source code recreated from a .class file by IntelliJ IDEA
(powered by Fernflower decompiler)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| package org.apache.hadoop.mapreduce.lib.map;
import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.mapreduce.Mapper;
@Public @Stable public class InverseMapper<K, V> extends Mapper<K, V, V, K> { public InverseMapper() { }
@Override public void map(K key, V value, Context context) throws IOException, InterruptedException { context.write(value, key); } }
|
- 该类用于排序,因为主类里在 Mapper 之后设置了
org.apache.hadoop.io.LongWritable.DecreasingComparator.class
- 作为 sort 过程的排序类,所以该类直接将 key 和 value 反过来输出
- 最终就可以完成按 value 值的降序排序