这一章主要是走开发Map/Reduce流程,另外介绍Hadoop的配置。
1、配置类org.apache.hadoop.conf.Configuration
属性名是String,Value是boolean、int、long、float、String、Class、File等。
2、上面提到的Configuration是从如下的xml配置文件中读出来的。
注意格式!
<?xml version="1.0"?> <configuration> <property> <name>color</name> <value>yellow</value> <description>Color</description> </property> ...其他property... </configuration>
读取方法:
Configuration conf = new Configuration(); conf.addResource("configuration-1.xml"); System.out.println(conf.get("color"));
要注意的是,Configuration不能识别value的类型,需要自己从getXXX的XXX时给定。
3、可以通过conf.addResource()来指定多个Conf配置文件,后添加的将覆盖前面的key-value。但是,标记为final的属性无法被覆盖!!
4、可以有类似Python的Configuration的引用其他变量做为Value的方法。如,${size}
5、经过实验,默认构造函数的Configuration,只会读取core-site.xml(文档说,还有core-default.xml)……如果需要其他的,自己addResource()吧。
6、Hadoop提供了CmdLine的解析接口,GenericOptionsParser,一般不用直接使用它,而用Tool和ToolRunner接口。
7、关于UnitTest,由于Map/reduce都是直接或者间接写入到OutputCollector中,所以需要一个Mock来替代些UnitTest。
以Mockito为例:
import static org.mockito.Mockito.*; import java.io.IOException; import org.apache.hadoop.io.*; import org.apache.hadoop.mapred.OutputCollector; import org.junit.*; public class MaxTemperatureMapperTest { @Test public void processesValidRecord() throws IOException { //构造Mapper MaxTemperatureMapper mapper = new MaxTemperatureMapper(); Text value = new Text("0043011990999991950051518004+68750+023550FM-12+0382" + // Year ^^^^ "99999V0203201N00261220001CN9999999N9-00111+99999999999"); // Temperature ^^^^^ //用Mock模拟出一个collector OutputCollector<Text, IntWritable> output = mock(OutputCollector.class); mapper.map(null, value, output, null); //检查collector是否被以参数1950和-11调用过 verify(output).collect(new Text("1950"), new IntWritable(-11)); } }
8、测试Reducer,Mock的时候,注意Reduce接受的input是多个value,如下:
Iterator<IntWritable> values = Arrays.asList(new IntWritable(10), new IntWritable(5)).iterator();
9、Local模式一般只用来做测试,最大的区别是,最多只能运行一个Reducer,他将忽略你指定的Reducer个数。
10、在写map、reduce的过程中,应当尽量使其业务逻辑分离(如单独写一个Parser,而不是都写在Parser中),这样可以方便进行测试。
11、hadoop提供了MiniDFSCluster和MiniMRCluster,即伪集群,用于在单机模拟集群上的运行效果。
12、JobHistory用于任务的恢复,于Local和Job的_logs/history中各存储一份。
13、Job完成时,每个Reducer会产生一个文件,part-xxxx的文件名,放在你指定的output目录下。
由于过于零碎,一般都会merge一下,hadoop就提供了很好的merge功能,用fs的 -getmerge命令。
如果需要,可以再sort
hadoop fs -getmerge max-temp max-temp-local sort max-temp-local | tail
14、Map/Reduce的Debug的方式,除了打Log外,可以使用Counter,在异常值除增加Counter
例如,对温度大于100的,增加Enum.OVER_100的Counter。
public class MaxTemperatureMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { enum Temperature { OVER_100 } private NcdcRecordParser parser = new NcdcRecordParser(); public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { parser.parse(value); if (parser.isValidTemperature()) { int airTemperature = parser.getAirTemperature(); //不太可能出现的异常值 if (airTemperature > 1000) { //同时将可疑的输入Log下来 System.err.println("Temperature over 100 degrees for input: " + value); //设置Status,将显示在Web界面上,便于我们知晓错误发生了,可惜只能设置一个 reporter.setStatus("Detected possibly corrupt record: see logs."); //增加counter reporter.incrCounter(Temperature.OVER_100, 1); } output.collect(new Text(parser.getYear()), new IntWritable(airTemperature)); } } }
15、Hadoop也提供了Remote Debugger,用于远程单步跟踪JVM
16、Map/Reduce的性能调优可能是我们最感兴趣的话题了。
总体来说,可以优化的地方有
(1)Map的数量,特别是当你的Map运行很长时间时,应考虑增加map数量。而如果小文件过多,应考虑用SequenceFile。
(2)Reduce的数量,应当充分利用CPU核资源,因为Reduce一般是CPU密集型作业。
(3)Combiner,是否可以通过Combiner进行优化。
(4)map的output的压缩
(5)memory的调优。
在后面的几章,会陆续涉及到这些内容
17、Profile。想要性能调优,你先得知道哪些地方是瓶颈!
我们可以借助HPROF(JDK5起提供的Profile工具)。
在Run驱动里做类似如下的设置:
conf.setProfileEnabled(true); conf.setProfileParams("-agentlib:hprof=cpu=samples,heap=sites,depth=6," + "force=n,thread=y,verbose=n,file=%s"); //第一个参数为是针对map(true)还是reduce(false),第二个参数是map/reduce的id conf.setProfileTaskRange(true, "0-2")
当任务执行完毕后,会生成Profile详情:
18、关于如何创建复杂的Map、Reduce算法,可以参考:《Data-Intensive Text Processing with MapReduce》
19、如何运行有依赖关系的任务,可以使用JobControl类,它接受自定义的DAG图。或者使用Oozie,也可以完成。