1、Map/Reduce任务
输入:
文件格式
id value
其中id是1~100之间的随机整数,value为1~100之间的随机浮点数。
输出:
每个id的最大value
生成这类文件,可以用python搞定,见本文末尾的附录。
2、Map/Reduce程序
这里就直接使用新(0.20.2)的API了,即org.apache.hadoop.mapreduce.*下的接口。
特别注意:
job.setNumReduceTasks(5)
指定了本Job的Reduce个数,默认为1的。
import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class MaxNumber { static class MaxNumberMapper extends Mapper<LongWritable, Text, LongWritable, DoubleWritable> { public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString().trim(); String tmp[] = line.split("\t"); // Parse id and value long id = -1; double val = -1; try { id = Long.parseLong(tmp[0]); val = Double.parseDouble(tmp[1]); } catch (Exception e) { // Do nothing } // Collect if (id != -1 && val != -1) { context.write(new LongWritable(id), new DoubleWritable(val)); } } } static class MaxNumberReducer extends Reducer<LongWritable, DoubleWritable, LongWritable, DoubleWritable> { public void reduce(LongWritable key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException { // Traverse an key's all value and get the max double max = Double.MIN_VALUE; for (DoubleWritable val : values) { max = Math.max(max, val.get()); } // Collect result context.write(key, new DoubleWritable(max)); } } //<a href="http://www.coder4.com/archives/2021">Hadoop小集群(5结点)测试</a> public static void main(String[] args) throws Exception { if (args.length != 2) { System.err.println("Usage:"); System.err.println("MaxNumber <input_path> <output path>"); System.exit(-1); } // Job Basic Job job = new Job(); job.setJobName("Max Number Map/Reduce"); job.setJarByClass(MaxNumber.class); // Job Set input && output FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // Job Class job.setMapperClass(MaxNumberMapper.class); job.setReducerClass(MaxNumberReducer.class); // Combine Class is useful in our case // job.setCombinerClass(MaxNumberReducer.class); // Set OutputClass job.setOutputKeyClass(LongWritable.class); job.setOutputValueClass(DoubleWritable.class); // Set map/reduce numbers job.setNumReduceTasks(5); // Return Ret Code to shell System.exit(job.waitForCompletion(true) ? 0 : 1); } }
3、集群配置
除了在Hadoop集群配置之外,还要额外注意一下几点:
结点部署
5个,hadoop1~hadoop5,内网映射/etc/hosts,如下:
需要特别注意的是:hosts映射一定要和hostname一致!!
比如这里我就把外网的hostname 50*注视掉了。
127.0.0.1 localhost localhost.localdomain #50.57.48.244 hadoop1 #For Hadoop Nodes 10.182.169.24 hadoop1 10.182.169.29 hadoop2 10.182.169.30 hadoop3 10.182.169.31 hadoop4 10.182.169.32 hadoop5
配置程序
(1) core-site.xml
<?xml version="1.0"?> <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> <configuration> <property> <name>fs.default.name</name> <value>hdfs://hadoop1:54310</value> </property> <property> <name>hadoop.tmp.dir</name> <value>/home/hadoop/hadoop_home/var</value> </property> </configuration>
(2) mapred-site.xml
这里mapred.tasktracker.map.tasks.maximum和mapred.tasktracker.reduce.tasks.maximum
控制了每个node上能运行最多的map和reduce任务。
map一般是I/O密集型任务,可以设置为CPU核数。
reduce一般是CPU密集型任务,设置为1~2比较合理。
<?xml version="1.0"?> <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> <!-- Put site-specific property overrides in this file. --> <configuration> <property> <name>mapred.job.tracker</name> <value>hadoop1:54311</value> </property> <property> <name>mapred.local.dir</name> <value>/home/hadoop/hadoop_home/var</value> </property> <property> <name>mapred.tasktracker.map.tasks.maximum</name> <value>4</value> </property> <property> <name>mapred.tasktracker.reduce.tasks.maximum</name> <value>1</value> </property> </configuration>
转载请注明:Hadoop小集群(5结点)测试
(3) hdfs-site.xml
注意:我这里的replication是3份,因为5个结点都有datanode。
<?xml version="1.0"?> <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> <configuration> <property> <name>dfs.replication</name> <value>3</value> </property> </configuration>
4、运行结果
50个测试文件,一个60MB左右。
(1) max map = 4, max reduce = 2, job.setNumReduceTasks(1)
map能跑到同时20个(5 * 4)
reduce只有1个
时间:7mins, 4sec
(2) max map = 4, max reduce = 2, job.setNumReduceTasks(10)
map能跑到同时20个(5 * 4)
reduce并行10个
时间:3mins, 58sec
(3) max map = 4, max reduce = 1, job.setNumReduceTasks(5)
map能跑到同时20个(5 * 4)
reduce并行5个
时间:4mins, 4sec
(4) max map = 4, max reduce = 1, job.setNumReduceTasks(5),加上Combination特性
map能跑到同时20个(5 * 4)
reduce并行5个, 且拷贝非常快
1mins, 49sec
当然了,不是所有Map/Reduce任务都能满足Combination特性。
(5) 单机用Python脚本跑 (见附录2)
时间:11mins, 5ses
小结:
5个机器的Hadoop集群运行效率大概是单机Python的2.8倍,加上Combine后能到6倍左右。
Hadoop性能不够理想是因为我没有对性能调优,比如内存、压缩存储、传输等。
附录1:生成输入测试数据用的Python程序
import random if __name__ == "__main__": path = "/home/hadoop/hadoop_home/input" files = 10 rows = 10000000 for i in xrange(files): fp = open(path+"/"+str(i),"w") for j in xrange(rows): print >>fp,"%d\t%f" % (random.randint(1,100),random.random()*100) fp.close()
附录2:单机跑的程序
import os,time if __name__ == "__main__": #<a href="http://www.coder4.com/archives/2021">Hadoop小集群(5结点)测试</a> start = int(time.time()) path = r"/home/hadoop/hadoop_home/input" dict = {} for root,dirs,files in os.walk(path): for f in files: file = root + "/" + f for line in open(file,"r").readlines(): id,val = line.split("\t") id = str(id) val = float(val) dict.setdefault(id,-1) dict[id] = max(val,dict[id]) for k,v in dict.items(): print k,v end = int(time.time()) print str(end-start),"(s)"