MapReduce Grep 示例源码分析

Grep.java

取自 GoogleSource

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
package org.apache.hadoop.examples;

import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.map.InverseMapper;
import org.apache.hadoop.mapreduce.lib.map.RegexMapper;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/* Extracts matching regexs from input files and counts them. */
public class Grep extends Configured implements Tool {
private Grep() {}
public int run(String[] args) throws Exception {
if (args.length < 3) {
System.out.println("Grep <inDir> <outDir> <regex> [<group>]");
ToolRunner.printGenericCommandUsage(System.out);
return 2;
}
Path tempDir =
new Path("grep-temp-"+
Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
Configuration conf = getConf();
conf.set(RegexMapper.PATTERN, args[2]);
if (args.length == 4)
conf.set(RegexMapper.GROUP, args[3]);
Job grepJob = new Job(conf);

try {

grepJob.setJobName("grep-search");
FileInputFormat.setInputPaths(grepJob, args[0]);
grepJob.setMapperClass(RegexMapper.class);
grepJob.setCombinerClass(LongSumReducer.class);
grepJob.setReducerClass(LongSumReducer.class);
FileOutputFormat.setOutputPath(grepJob, tempDir);
grepJob.setOutputFormatClass(SequenceFileOutputFormat.class);
grepJob.setOutputKeyClass(Text.class);
grepJob.setOutputValueClass(LongWritable.class);
grepJob.waitForCompletion(true);
Job sortJob = new Job(conf);
sortJob.setJobName("grep-sort");
FileInputFormat.setInputPaths(sortJob, tempDir);
sortJob.setInputFormatClass(SequenceFileInputFormat.class);
sortJob.setMapperClass(InverseMapper.class);
sortJob.setNumReduceTasks(1);
FileOutputFormat.setOutputPath(sortJob, new Path(args[1]));
sortJob.setSortComparatorClass(LongWritable.DecreasingComparator.class);
sortJob.waitForCompletion(true);
}
finally {
FileSystem.get(conf).delete(tempDir, true);
}
return 0;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new Grep(), args);
System.exit(res);
}
}
  • conf.set(RegexMapper.PATTERN, args[2]); 设置了用于匹配字符串的正则表达式
  • 该程序包含两个任务
    • grep-search
    • grep-sort
  • grep-search 任务
    • Mapper 类为 org.apache.hadoop.mapreduce.lib.map.RegexMapper
    • CombinerReducer 类为 org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer
    • 该类将每个 Mapper 作业输出的每个键的值相加并输出
  • grep-sort 任务
    • 实现了对所有结果按照键值降序排列
    • sortJob.setNumReduceTasks(1); 设置了只有一个 Reduce Task 保证了排序的全局性

RegexMapper.java

Source code recreated from a .class file by IntelliJ IDEA
(powered by Fernflower decompiler)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package org.apache.hadoop.mapreduce.lib.map;

import java.io.IOException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

@Public
@Stable
public class RegexMapper<K> extends Mapper<K, Text, Text, LongWritable> {
public static String PATTERN = "mapreduce.mapper.regex";
public static String GROUP = "mapreduce.mapper.regexmapper.group";
private Pattern pattern;
private int group;

public RegexMapper() {
}

@Override
public void setup(Context context) {
Configuration conf = context.getConfiguration();
this.pattern = Pattern.compile(conf.get(PATTERN));
this.group = conf.getInt(GROUP, 0);
}

@Override
public void map(K key, Text value, Context context) throws IOException, InterruptedException {
String text = value.toString();
Matcher matcher = this.pattern.matcher(text);

while(matcher.find()) {
context.write(new Text(matcher.group(this.group)), new LongWritable(1L));
}

}
}
  • 该类继承自 org.apache.hadoop.mapreduce.Mapper 类并重写了 setupmap 方法
  • setup 方法中,将 PATTERN 设置到 pattern 对象,并将 GROUP 设置到 group 对象。PATTERNGROUP 在主类中已设置,对 Java 正则表达式用法不明确可参考 W3Cschool
  • map 方法中,每次匹配到正则表达式的字符串就将其输出

InverseMapper.java

Source code recreated from a .class file by IntelliJ IDEA
(powered by Fernflower decompiler)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package org.apache.hadoop.mapreduce.lib.map;

import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.mapreduce.Mapper;

@Public
@Stable
public class InverseMapper<K, V> extends Mapper<K, V, V, K> {
public InverseMapper() {
}

@Override
public void map(K key, V value, Context context) throws IOException, InterruptedException {
context.write(value, key);
}
}
  • 该类用于排序,因为主类里在 Mapper 之后设置了
  • org.apache.hadoop.io.LongWritable.DecreasingComparator.class
  • 作为 sort 过程的排序类,所以该类直接将 keyvalue 反过来输出
  • 最终就可以完成按 value 值的降序排序