本来是要用Hadoop给柯西搜索写一下锚文本聚集。但是发现上个版本的爬虫居然没有存锚文本,实在无聊,写了个统计域名(实际是host)的计数器。
输入:一行一个url
流程:提取url的domain,对domain计数+1
输出:域名,域名计数
这次完全用新的API写的。
代码如下:
Mapper
package com.keseek.hadoop; import java.io.IOException; import java.net.URI; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.Mapper; public class DomainCountMapper implements Mapper<LongWritable, Text, Text, LongWritable> { @Override public void configure(JobConf arg0) { // Init Text and LongWritable domain = new Text(); one = new LongWritable(1); } @Override public void close() throws IOException { // TODO Auto-generated method stub } @Override public void map(LongWritable key, Text value, OutputCollector<Text, LongWritable> output, Reporter reporter) throws IOException { // Get URL String url = value.toString().trim(); // URL->Domain && Collect domain.set(ParseDomain(url)); if (domain.getLength() != 0) { output.collect(domain, one); } } public String ParseDomain(String url) { try { URI uri = URI.create(url); return uri.getHost(); } catch (Exception e) { return ""; } } // Shared used Text domain private Text domain; // One static private LongWritable one; }
Reducer
package com.keseek.hadoop; import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.Reducer; public class DomainCountReducer implements Reducer<Text, LongWritable, Text, LongWritable> { @Override public void configure(JobConf arg0) { // TODO Auto-generated method stub } @Override public void close() throws IOException { // TODO Auto-generated method stub } @Override public void reduce(Text key, Iterator<LongWritable> values, OutputCollector<Text, LongWritable> output, Reporter reporter) throws IOException { // Count the domain long cnt = 0; while (values.hasNext()) { cnt += values.next().get(); } // Output output.collect(key, new LongWritable(cnt)); } }
Main
package com.keseek.hadoop; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RunningJob; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapred.TextOutputFormat; public class DomainCountMain { public static void main(String[] args) throws Exception { // Param for path if (args.length != 2) { System.out.println("Usage:"); System.out .println("DomainCountMain.jar <Input_Path> <Outpu_Path>"); System.exit(-1); } // Configure JobConf JobConf jobconf = new JobConf(DomainCountMain.class); jobconf.setJobName("Domain Counter by Coder4"); FileInputFormat.setInputPaths(jobconf, new Path(args[0])); FileOutputFormat.setOutputPath(jobconf, new Path(args[1])); jobconf.setInputFormat(TextInputFormat.class); jobconf.setOutputFormat(TextOutputFormat.class); jobconf.setMapperClass(DomainCountMapper.class); jobconf.setReducerClass(DomainCountReducer.class); jobconf.setCombinerClass(DomainCountReducer.class); jobconf.setMapOutputKeyClass(Text.class); jobconf.setMapOutputValueClass(LongWritable.class); jobconf.setOutputKeyClass(Text.class); jobconf.setOutputValueClass(LongWritable.class); // Run job RunningJob run = JobClient.runJob(jobconf); run.waitForCompletion(); if (run.isSuccessful()) { System.out.println("<<<DomainCount Main>>> success."); } else { System.out.println("<<<DomainCount Main>>> error."); } } }